Author: rhuijben
Date: Mon Nov 16 16:06:05 2015
New Revision: 1714613

URL: http://svn.apache.org/viewvc?rev=1714613&view=rev
Log:
Make the http2 protocol capable of sending a response on an incoming request.
This adds basic body write support for this direction.

* protocols/http2_protocol.c
  (serf_http2__max_payload_size,
   serf_http2__alloc_window,
   serf_http2__return_window): New function.

* protocols/http2_protocol.h
  (serf_http2__max_payload_size,
   serf_http2__alloc_window,
   serf_http2__return_window): New function.

* protocols/http2_stream.c
  (includes): Add apr_strings.h.
  (serf_http2_stream_data_t): Add more state.
  (serf_http2__stream_create): Init state.

  (stream_send_headers): New function, extracted from...
  (serf_http2__stream_setup_next_request): ... here, which is now a caller.

  (window_allocate_info_t): New struct.
  (data_write_started,
   data_write_done): New function.
  (stream_send_body): New function, based on some parts of stream_send_headers.
  (stream_send_headers): New function.
  (http2_stream_enqueue_response): Implement function.
  (serf_http2__stream_handle_hpack): Cache hpack table, as needed for response.
  (serf_http2__stream_processor): Remove duplicate definition of variable.
* test/test_server.c

Modified:
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h
    serf/trunk/protocols/http2_stream.c
    serf/trunk/test/test_server.c

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Mon Nov 16 16:06:05 2015
@@ -1885,3 +1885,38 @@ serf_http2__setup_incoming_request(serf_
 
     return APR_SUCCESS;
 }
+
+apr_size_t
+serf_http2__max_payload_size(serf_http2_protocol_t *h2)
+{
+    return h2->lr_max_framesize;
+}
+
+apr_size_t serf_http2__alloc_window(serf_http2_protocol_t *h2,
+                                    serf_http2_stream_t *stream,
+                                    apr_size_t requested)
+{
+    if (requested > h2->lr_max_framesize)
+        requested = h2->lr_max_framesize;
+    if (requested > h2->lr_window)
+        requested = h2->lr_window;
+    if (requested > stream->lr_window)
+        requested = stream->lr_window;
+
+    if (requested) {
+        h2->lr_window -= requested;
+        stream->lr_window -= requested;
+    }
+
+    return requested;
+}
+
+void serf_http2__return_window(serf_http2_protocol_t *h2,
+                               serf_http2_stream_t *stream,
+                                apr_size_t returned)
+{
+    SERF_H2_assert(h2->lr_window + returned <= HTTP2_WINDOW_MAX_ALLOWED);
+    SERF_H2_assert(stream->lr_window + returned <= HTTP2_WINDOW_MAX_ALLOWED);
+    h2->lr_window += returned;
+    stream->lr_window += returned;
+}

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Mon Nov 16 16:06:05 2015
@@ -204,6 +204,18 @@ serf_http2__setup_incoming_request(serf_
                                    void **req_setup_baton,
                                    serf_http2_protocol_t *h2);
 
+/* Gets current l->r max payload size */
+apr_size_t
+serf_http2__max_payload_size(serf_http2_protocol_t *h2);
+
+apr_size_t serf_http2__alloc_window(serf_http2_protocol_t *h2,
+                                    serf_http2_stream_t *stream,
+                                    apr_size_t requested);
+
+void serf_http2__return_window(serf_http2_protocol_t *h2,
+                               serf_http2_stream_t *stream,
+                               apr_size_t returned);
+
 apr_status_t
 serf_http2__stream_reset(serf_http2_stream_t *stream,
                          apr_status_t reason,

Modified: serf/trunk/protocols/http2_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Mon Nov 16 16:06:05 2015
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 
 #include <apr_pools.h>
+#include <apr_strings.h>
 
 #include "serf.h"
 #include "serf_bucket_util.h"
@@ -34,6 +35,8 @@ struct serf_http2_stream_data_t
     serf_request_t *request; /* May be NULL as streams may outlive requests */
     serf_incoming_request_t *in_request;
     serf_bucket_t *response_agg;
+    serf_hpack_table_t *tbl;
+    serf_bucket_t *body_tail;
 };
 
 serf_http2_stream_t *
@@ -56,6 +59,8 @@ serf_http2__stream_create(serf_http2_pro
     stream->data->request = NULL;
     stream->data->in_request = NULL;
     stream->data->response_agg = NULL;
+    stream->data->tbl = NULL;
+    stream->data->body_tail = NULL;
 
     stream->lr_window = lr_window;
     stream->rl_window = rl_window;
@@ -84,6 +89,195 @@ serf_http2__stream_cleanup(serf_http2_st
     serf_bucket_mem_free(stream->alloc, stream);
 }
 
+static apr_status_t stream_send_headers(serf_http2_stream_t *stream,
+                                        serf_bucket_t *hpack,
+                                        apr_size_t max_payload_size,
+                                        bool end_stream)
+{
+    apr_status_t status;
+    bool first_frame = true;
+
+    /* And now schedule the packet for writing. Note that it is required
+    by the HTTP/2 spec to send HEADERS and CONTINUATION directly after
+    each other, without other frames inbetween. */
+    while (hpack != NULL)
+    {
+        serf_bucket_t *next;
+        apr_uint64_t remaining;
+
+        /* hpack buckets implement get_remaining. And if they didn't adding the
+        framing around them would apply some reads that fix the buckets.
+
+        So we can ignore the theoretical endless loop here for two different
+        reasons
+        */
+        remaining = serf_bucket_get_remaining(hpack);
+
+        if (remaining > max_payload_size) {
+            serf_bucket_split_create(&next, &hpack, hpack,
+                                     max_payload_size - (max_payload_size / 4),
+                                     max_payload_size);
+        }
+        else
+        {
+            next = hpack;
+            hpack = NULL;
+        }
+
+        next = serf__bucket_http2_frame_create(next,
+                                               first_frame
+                                               ? HTTP2_FRAME_TYPE_HEADERS
+                                               : HTTP2_FRAME_TYPE_CONTINUATION,
+                                               (end_stream
+                                                ? HTTP2_FLAG_END_STREAM
+                                                : 0)
+                                               | ((hpack != NULL)
+                                                  ? 0
+                                                  : HTTP2_FLAG_END_HEADERS),
+                                               &stream->streamid,
+                                               serf_http2__allocate_stream_id,
+                                               stream,
+                                               max_payload_size,
+                                               next->allocator);
+        status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
+
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status; /* Connection dead */
+
+        first_frame = false; /* Continue with 'continuation' frames */
+    }
+
+    return APR_SUCCESS;
+}
+
+typedef struct window_allocate_info_t
+{
+    serf_http2_stream_t *stream;
+    serf_bucket_t *bkt;
+    apr_size_t allocated;
+} window_allocate_info_t;
+
+static apr_status_t data_write_started(void *baton,
+                                       apr_uint64_t bytes_read)
+{
+    window_allocate_info_t *wai = baton;
+
+    bytes_read = serf_bucket_get_remaining(wai->bkt);
+
+    /* Handles unavailable for free */
+    if (bytes_read <= wai->allocated) {
+        /* Nice, we can return something now */
+        apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read);
+
+        serf_http2__return_window(wai->stream->h2, wai->stream, to_much);
+
+        wai->allocated = 0;
+    }
+    return APR_SUCCESS;
+}
+
+static apr_status_t data_write_done(void *baton,
+                                    apr_uint64_t bytes_read)
+{
+    window_allocate_info_t *wai = baton;
+
+    if (wai->allocated && bytes_read <= wai->allocated) {
+        /* Nice, we can return something now */
+        apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read);
+        wai->stream->lr_window += to_much;
+
+        serf_http2__return_window(wai->stream->h2, wai->stream, to_much);
+
+        wai->allocated = 0;
+    }
+
+    serf_bucket_mem_free(wai->stream->alloc, wai);
+    return APR_SUCCESS;
+}
+
+
+static apr_status_t stream_send_body(serf_http2_stream_t *stream,
+                                     serf_bucket_t *body)
+{
+    apr_uint64_t remaining;
+    serf_bucket_t *next;
+    apr_size_t prefix_len;
+    bool end_stream;
+    apr_status_t status;
+
+    SERF_H2_assert(stream->status == H2S_OPEN
+                   || stream->status == H2S_HALFCLOSED_REMOTE);
+    SERF_H2_assert(!stream->data->body_tail || (body ==
+                                                stream->data->body_tail));
+
+    /* Sending DATA frames over HTTP/2 is not easy as this usually requires
+       handling windowing, priority, etc. This code will improve over time */
+
+    if (!body)
+        remaining = 0;
+    else
+        remaining = serf_bucket_get_remaining(body);
+
+    /* If the stream decided we are already done */
+    if (remaining == 0) {
+        if (stream->status == H2S_OPEN)
+            stream->status = H2S_HALFCLOSED_LOCAL;
+        else
+            stream->status = H2S_CLOSED;
+
+        next = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_DATA,
+                                               HTTP2_FLAG_END_STREAM,
+                                               &stream->streamid,
+                                               serf_http2__allocate_stream_id,
+                                               stream, 0, stream->alloc);
+        stream->data->body_tail = NULL;
+        return serf_http2__enqueue_frame(stream->h2, next, false);
+    }
+
+    prefix_len = serf_http2__alloc_window(stream->h2, stream,
+                                          (remaining >= APR_SIZE_MAX)
+                                          ? SERF_READ_ALL_AVAIL
+                                          : (apr_size_t)remaining);
+
+    if (prefix_len == 0) {
+        /* No window left */
+        stream->data->body_tail = body;
+        return APR_SUCCESS;
+    }
+
+    if (prefix_len < remaining) {
+        window_allocate_info_t *wai;
+        serf_bucket_split_create(&body, &stream->data->body_tail, body,
+                                 MIN(prefix_len, 1024), prefix_len);
+
+        wai = serf_bucket_mem_alloc(stream->alloc, sizeof(*wai));
+        wai->stream = stream;
+        wai->allocated = prefix_len;
+
+        body = serf__bucket_event_create(body, wai,
+                                         data_write_started,
+                                         data_write_done, NULL, stream->alloc);
+        end_stream = false;
+    }
+    else
+        end_stream = true;
+
+    next = serf__bucket_http2_frame_create(body, HTTP2_FRAME_TYPE_DATA,
+                                           end_stream ? HTTP2_FLAG_END_STREAM
+                                                      : 0,
+                                           &stream->streamid,
+                                           serf_http2__allocate_stream_id,
+                                           stream, prefix_len,
+                                           body->allocator);
+
+    status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
+
+    if (!end_stream)
+        return APR_ENOTIMPL; /* The rest of the body won't be sent yet */
+
+    return status;
+}
+
 apr_status_t
 serf_http2__stream_setup_next_request(serf_http2_stream_t *stream,
                                       serf_connection_t *conn,
@@ -94,7 +288,6 @@ serf_http2__stream_setup_next_request(se
     apr_status_t status;
     serf_bucket_t *hpack;
     serf_bucket_t *body;
-    bool first_frame;
     bool end_stream;
 
     SERF_H2_assert(request != NULL);
@@ -137,58 +330,10 @@ serf_http2__stream_setup_next_request(se
     else
         end_stream = false;
 
-    first_frame = true;
-
-    /* And now schedule the packet for writing. Note that it is required
-       by the HTTP/2 spec to send HEADERS and CONTINUATION directly after
-       each other, without other frames inbetween. */
-    while (hpack != NULL)
-    {
-        serf_bucket_t *next;
-        apr_uint64_t remaining;
-
-        /* hpack buckets implement get_remaining. And if they didn't adding the
-           framing around them would apply some reads that fix the buckets.
-
-           So we can ignore the theoretical endless loop here for two different
-           reasons
-         */
-        remaining = serf_bucket_get_remaining(hpack);
-
-        if (remaining > max_payload_size) {
-            serf_bucket_split_create(&next, &hpack, hpack,
-                                     max_payload_size - (max_payload_size / 4),
-                                     max_payload_size);
-        }
-        else
-        {
-            next = hpack;
-            hpack = NULL;
-        }
-
-        next = serf__bucket_http2_frame_create(next,
-                                               first_frame
-                                               ? HTTP2_FRAME_TYPE_HEADERS
-                                               : HTTP2_FRAME_TYPE_CONTINUATION,
-                                               (end_stream
-                                                ? HTTP2_FLAG_END_STREAM
-                                                : 0)
-                                               | ((hpack != NULL)
-                                                  ? 0
-                                                  : HTTP2_FLAG_END_HEADERS),
-                                               &stream->streamid,
-                                               serf_http2__allocate_stream_id,
-                                               stream,
-                                               max_payload_size,
-                                               request->allocator);
-        status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
-
-        if (SERF_BUCKET_READ_ERROR(status))
-            return status; /* Connection dead */
-
-        first_frame = false; /* Continue with 'continuation' frames */
-    }
-
+    status = stream_send_headers(stream, hpack, max_payload_size,
+                                 end_stream);
+    if (status)
+        return status;
 
     if (end_stream) {
         stream->status = H2S_HALFCLOSED_LOCAL; /* Headers sent; no body */
@@ -235,14 +380,69 @@ stream_response_eof(void *baton,
     }
 }
 
+static int set_hpack_header(void *baton,
+                            const char *key,
+                            const char *value)
+{
+    serf_bucket_t *hpack = baton;
+
+    serf__bucket_hpack_setc(hpack, key, value);
+    return 0;
+}
+
 static apr_status_t
 http2_stream_enqueue_response(serf_incoming_request_t *request,
                               void *enqueue_baton,
                               serf_bucket_t *response_bkt)
 {
     serf_http2_stream_t *stream = enqueue_baton;
+    serf_bucket_t *hpack;
+    serf_bucket_t *headers;
+    serf_bucket_t *h1_response;
+    serf_status_line sline;
+    apr_status_t status;
+
+    /* OK, this could be implemented using knowledge of the buckets, in
+       a 100% more efficient, but I don't want to introduce new bucket
+       types for this yet. Let's just read everything the http/1 way
+       and put it in HTTP/2 appropriate places */
+    h1_response = serf_bucket_response_create(response_bkt,
+                                              stream->alloc);
+    do
+    {
+        status = serf_bucket_response_status(h1_response, &sline);
+    } while (status != APR_SUCCESS);
+
+    if (status != APR_SUCCESS)
+        return APR_EGENERAL; /* Can't read statusline. No EAGAIN support before
+                                the body (yet) */
+
+    hpack = serf__bucket_hpack_create(stream->data->tbl, stream->alloc);
+    serf__bucket_hpack_setc(hpack, ":status",
+                            apr_itoa(stream->data->in_request->pool,
+                                     sline.code));
 
-    return APR_ENOTIMPL;
+    do
+    {
+        status = serf_bucket_response_wait_for_headers(h1_response);
+    } while (status != APR_SUCCESS);
+
+    if (status != APR_SUCCESS)
+        return APR_EGENERAL; /* Can't read body. No EAGAIN support before
+                                the body (yet) */
+
+    headers = serf_bucket_response_get_headers(h1_response);
+
+    serf_bucket_headers_do(headers, set_hpack_header, hpack);
+
+    status = stream_send_headers(stream, hpack,
+                                 serf_http2__max_payload_size(stream->h2),
+                                 false);
+
+    if (status)
+        return status;
+
+    return stream_send_body(stream, response_bkt);
 }
 
 static apr_status_t
@@ -380,6 +580,8 @@ serf_http2__stream_handle_hpack(serf_htt
         if (!stream->data->response_agg)
             stream_setup_response(stream, config);
 
+        stream->data->tbl = hpack_tbl;
+
         bucket = serf__bucket_hpack_decode_create(bucket, NULL, NULL,
                                                   max_entry_size,
                                                   hpack_tbl, allocator);
@@ -530,7 +732,7 @@ serf_http2__stream_processor(void *baton
             return status;
 
         if (APR_STATUS_IS_EOF(status)) {
-            apr_status_t status = serf_incoming_response_create(request);
+            status = serf_incoming_response_create(request);
 
             if (status)
                 return status;

Modified: serf/trunk/test/test_server.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/test/test_server.c (original)
+++ serf/trunk/test/test_server.c Mon Nov 16 16:06:05 2015
@@ -238,7 +238,7 @@ void test_listen_http2(CuTest *tc)
 
     status = run_client_server_loop(tb, num_requests,
                                     handler_ctx, tb->pool);
-    CuAssertIntEquals(tc, APR_ENOTIMPL, status);
+    CuAssertIntEquals(tc, APR_SUCCESS, status);
 }
 
 /*****************************************************************************/


Reply via email to