Author: rhuijben
Date: Mon Nov 16 17:12:05 2015
New Revision: 1714631

URL: http://svn.apache.org/viewvc?rev=1714631&view=rev
Log:
Add initial generic support for sending http2 request and response bodies.

In theory this should make our http/2 support fully functional, although
I expect that we will need quite some tweaking to make this production
ready.

This patch implements a simple linked list of streams that still have
writing to do and walks over them until it finds one that can be written
given the window rules.

This code may need a lot of improvements to fully support the http/2
priority queueing, but the current implementation matches the
expectations of a limited server/client within the specs.
(Improvement is optional).

* protocols/http2_protocol.c
  (serf_http2_protocol_t): Add linked list of writable requests.
  (http2_write_data): New function.
  (http2_outgoing_write,
   http2_incoming_write): Call http2_write_data when buffered data was
     flushed.
  (serf_http2__ensure_writable): New function.

* protocols/http2_protocol.h
  (serf_http2_stream_t): Add fields.

* protocols/http2_stream.c
  (serf_http2_stream_data_t): Rename body to data.
  (serf_http2__stream_create): Initialize fields.
  (stream_send_body): Rename to...
  (stream_send_data): ... this.
  (serf_http2__stream_write_data): New function.
  (destroy_request_bucket): New function.
  (serf_http2__stream_setup_next_request): Schedule body writing.
  (http2_stream_enqueue_response): Update caller.

* test/test_server.c
  (test_listen_http2): Send bodies, like the http1 test.

Modified:
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h
    serf/trunk/protocols/http2_stream.c
    serf/trunk/test/test_server.c

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1714631&r1=1714630&r2=1714631&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Mon Nov 16 17:12:05 2015
@@ -160,6 +160,8 @@ struct serf_http2_protocol_t
 
     serf_bucket_t *continuation_bucket;
     apr_int32_t continuation_streamid;
+
+    serf_http2_stream_t *first_writable, *last_writable, *cur_writable;
 };
 
 /* Forward definition */
@@ -1595,6 +1597,47 @@ http2_process(serf_http2_protocol_t *h2)
     } /* while(TRUE) */
 }
 
+static apr_status_t http2_write_data(serf_http2_protocol_t *h2)
+{
+    serf_http2_stream_t *stream = h2->cur_writable;
+
+    while (TRUE)
+    {
+        apr_status_t status;
+
+        if (!stream)
+            stream = h2->first_writable;
+
+        if (!stream)
+            return APR_SUCCESS;
+
+        if (stream->status != H2S_OPEN
+            && stream->status != H2S_HALFCLOSED_REMOTE)
+        {
+            /* This stream is NOT writable. Remove it */
+            if (stream->prev_writable)
+                stream->prev_writable->next_writable = stream->next_writable;
+            else
+                h2->first_writable = stream->next_writable;
+
+            if (stream->next_writable)
+                stream->next_writable->prev_writable = stream->prev_writable;
+            else
+                h2->last_writable = stream->prev_writable;
+
+            stream = stream->next_writable;
+            continue;
+        }
+
+        status = serf_http2__stream_write_data(stream);
+        if (status || stream->lr_window == 0)
+            h2->cur_writable = stream->next_writable;
+
+        return status ? status : APR_EAGAIN;
+    }
+
+}
+
 static apr_status_t
 http2_outgoing_read(serf_connection_t *conn)
 {
@@ -1649,6 +1692,10 @@ http2_outgoing_write(serf_connection_t *
     else if (status)
         return status;
 
+    status = http2_write_data(h2);
+    if (status)
+        return status;
+
       /* Probably nothing to write. Connection will check new requests */
     conn->dirty_conn = 1;
     conn->ctx->dirty_pollset = 1;
@@ -1754,7 +1801,7 @@ http2_incoming_read(serf_incoming_t *cli
 static apr_status_t
 http2_incoming_write(serf_incoming_t *client)
 {
-    /* serf_http2_protocol_t *h2 = client->protocol_baton; */
+    serf_http2_protocol_t *h2 = client->protocol_baton;
     apr_status_t status;
 
     status = serf__incoming_client_flush(client, TRUE);
@@ -1764,6 +1811,10 @@ http2_incoming_write(serf_incoming_t *cl
     else if (status)
         return status;
 
+    status = http2_write_data(h2);
+    if (status)
+        return status;
+
     /* Probably nothing to write. Connection will check new requests */
     client->dirty_conn = true;
     client->ctx->dirty_pollset = true;
@@ -1952,3 +2003,20 @@ void serf_http2__return_window(serf_http
     h2->lr_window += returned;
     stream->lr_window += returned;
 }
+
+void serf_http2__ensure_writable(serf_http2_stream_t *stream)
+{
+    serf_http2_protocol_t *h2 = stream->h2;
+    SERF_H2_assert(stream->status == H2S_OPEN
+                   || stream->status == H2S_HALFCLOSED_REMOTE);
+
+    if (stream->next_writable || stream->prev_writable)
+        return;
+
+    stream->prev_writable = h2->last_writable;
+    h2->last_writable = stream;
+    if (h2->first_writable)
+        h2->last_writable->next_writable = stream;
+    else
+        h2->first_writable = stream;
+}

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1714631&r1=1714630&r2=1714631&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Mon Nov 16 17:12:05 2015
@@ -146,6 +146,7 @@ typedef struct serf_http2_stream_t
   struct serf_http2_stream_t *new_reserved_stream;
 
   /* TODO: Priority, etc. */
+  struct serf_http2_stream_t *next_writable, *prev_writable;
 } serf_http2_stream_t;
 
 typedef apr_status_t (* serf_http2_processor_t)(void *baton,
@@ -216,6 +217,8 @@ void serf_http2__return_window(serf_http
                                serf_http2_stream_t *stream,
                                apr_size_t returned);
 
+void serf_http2__ensure_writable(serf_http2_stream_t *stream);
+
 apr_status_t
 serf_http2__stream_reset(serf_http2_stream_t *stream,
                          apr_status_t reason,
@@ -244,6 +247,9 @@ serf_http2__stream_processor(void *baton
                              serf_http2_protocol_t *h2,
                              serf_bucket_t *bucket);
 
+apr_status_t
+serf_http2__stream_write_data(serf_http2_stream_t *stream);
+
 #ifdef __cplusplus
 }
 #endif

Modified: serf/trunk/protocols/http2_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1714631&r1=1714630&r2=1714631&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Mon Nov 16 17:12:05 2015
@@ -36,7 +36,7 @@ struct serf_http2_stream_data_t
     serf_incoming_request_t *in_request;
     serf_bucket_t *response_agg;
     serf_hpack_table_t *tbl;
-    serf_bucket_t *body_tail;
+    serf_bucket_t *data_tail;
 };
 
 serf_http2_stream_t *
@@ -60,7 +60,7 @@ serf_http2__stream_create(serf_http2_pro
     stream->data->in_request = NULL;
     stream->data->response_agg = NULL;
     stream->data->tbl = NULL;
-    stream->data->body_tail = NULL;
+    stream->data->data_tail = NULL;
 
     stream->lr_window = lr_window;
     stream->rl_window = rl_window;
@@ -73,6 +73,8 @@ serf_http2__stream_create(serf_http2_pro
     stream->status = (streamid >= 0) ? H2S_IDLE : H2S_INIT;
     stream->new_reserved_stream = NULL;
 
+    stream->prev_writable = stream->next_writable = NULL;
+
     return stream;
 }
 
@@ -196,8 +198,8 @@ static apr_status_t data_write_done(void
 }
 
 
-static apr_status_t stream_send_body(serf_http2_stream_t *stream,
-                                     serf_bucket_t *body)
+static apr_status_t stream_send_data(serf_http2_stream_t *stream,
+                                     serf_bucket_t *data)
 {
     apr_uint64_t remaining;
     serf_bucket_t *next;
@@ -207,16 +209,16 @@ static apr_status_t stream_send_body(ser
 
     SERF_H2_assert(stream->status == H2S_OPEN
                    || stream->status == H2S_HALFCLOSED_REMOTE);
-    SERF_H2_assert(!stream->data->body_tail || (body ==
-                                                stream->data->body_tail));
+    SERF_H2_assert(!stream->data->data_tail || (data ==
+                                                stream->data->data_tail));
 
     /* Sending DATA frames over HTTP/2 is not easy as this usually requires
        handling windowing, priority, etc. This code will improve over time */
 
-    if (!body)
+    if (!data)
         remaining = 0;
     else
-        remaining = serf_bucket_get_remaining(body);
+        remaining = serf_bucket_get_remaining(data);
 
     /* If the stream decided we are already done */
     if (remaining == 0) {
@@ -225,12 +227,14 @@ static apr_status_t stream_send_body(ser
         else
             stream->status = H2S_CLOSED;
 
+        serf_bucket_destroy(data);
+
         next = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_DATA,
                                                HTTP2_FLAG_END_STREAM,
                                                &stream->streamid,
                                                serf_http2__allocate_stream_id,
                                                stream, 0, stream->alloc);
-        stream->data->body_tail = NULL;
+        stream->data->data_tail = NULL;
         return serf_http2__enqueue_frame(stream->h2, next, false);
     }
 
@@ -241,20 +245,20 @@ static apr_status_t stream_send_body(ser
 
     if (prefix_len == 0) {
         /* No window left */
-        stream->data->body_tail = body;
+        stream->data->data_tail = data;
         return APR_SUCCESS;
     }
 
     if (prefix_len < remaining) {
         window_allocate_info_t *wai;
-        serf_bucket_split_create(&body, &stream->data->body_tail, body,
+        serf_bucket_split_create(&data, &stream->data->data_tail, data,
                                  MIN(prefix_len, 1024), prefix_len);
 
         wai = serf_bucket_mem_alloc(stream->alloc, sizeof(*wai));
         wai->stream = stream;
         wai->allocated = prefix_len;
 
-        body = serf__bucket_event_create(body, wai,
+        data = serf__bucket_event_create(data, wai,
                                          data_write_started,
                                          data_write_done, NULL, stream->alloc);
         end_stream = false;
@@ -262,23 +266,46 @@ static apr_status_t stream_send_body(ser
     else
         end_stream = true;
 
-    next = serf__bucket_http2_frame_create(body, HTTP2_FRAME_TYPE_DATA,
+    next = serf__bucket_http2_frame_create(data, HTTP2_FRAME_TYPE_DATA,
                                            end_stream ? HTTP2_FLAG_END_STREAM
                                                       : 0,
                                            &stream->streamid,
                                            serf_http2__allocate_stream_id,
                                            stream, prefix_len,
-                                           body->allocator);
+                                           data->allocator);
 
     status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
 
-    if (!end_stream)
-        return APR_ENOTIMPL; /* The rest of the body won't be sent yet */
+    if (!end_stream) {
+        /* Write more later */
+        serf_http2__ensure_writable(stream);
+    }
 
     return status;
 }
 
 apr_status_t
+serf_http2__stream_write_data(serf_http2_stream_t *stream)
+{
+    SERF_H2_assert(stream->status == H2S_OPEN
+                   || stream->status == H2S_HALFCLOSED_REMOTE);
+    SERF_H2_assert(stream->data->data_tail != NULL);
+
+    return stream_send_data(stream, stream->data->data_tail);
+}
+
+apr_status_t destroy_request_bucket(void *baton,
+                                    apr_uint64_t bytes_read)
+{
+    serf_request_t *request = baton;
+
+    serf_bucket_destroy(request->req_bkt);
+    request->req_bkt = NULL;
+    request->writing = SERF_WRITING_FINISHED;
+    return APR_SUCCESS;
+}
+
+apr_status_t
 serf_http2__stream_setup_next_request(serf_http2_stream_t *stream,
                                       serf_connection_t *conn,
                                       apr_size_t max_payload_size,
@@ -337,13 +364,20 @@ serf_http2__stream_setup_next_request(se
 
     if (end_stream) {
         stream->status = H2S_HALFCLOSED_LOCAL; /* Headers sent; no body */
-    }
-    else {
-        stream->status = H2S_OPEN; /* Headers sent. Body to go */
-        /* ### TODO: Schedule body to be sent */
+        return APR_SUCCESS;
     }
 
-    return APR_SUCCESS;
+    /* Yuck... we are not allowed to destroy body */
+    body = serf_bucket_barrier_create(body, request->allocator);
+
+    /* Setup an event bucket to destroy the actual request bucket when
+       the body is done */
+    body = serf__bucket_event_create(body, request,
+                                     NULL, NULL, destroy_request_bucket,
+                                     request->allocator);
+
+    stream->status = H2S_OPEN; /* Headers sent. Body to go */
+    return stream_send_data(stream, body);
 }
 
 apr_status_t
@@ -442,7 +476,7 @@ http2_stream_enqueue_response(serf_incom
     if (status)
         return status;
 
-    return stream_send_body(stream, response_bkt);
+    return stream_send_data(stream, response_bkt);
 }
 
 static apr_status_t

Modified: serf/trunk/test/test_server.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714631&r1=1714630&r2=1714631&view=diff
==============================================================================
--- serf/trunk/test/test_server.c (original)
+++ serf/trunk/test/test_server.c Mon Nov 16 17:12:05 2015
@@ -232,9 +232,8 @@ void test_listen_http2(CuTest *tc)
                                        tb->pool);
     CuAssertIntEquals(tc, APR_SUCCESS, status);
 
-    /* Our http2 implementation doesn't support request bodies yet */
-    create_new_request(tb, &handler_ctx[0], "GET", "/", -1);
-    create_new_request(tb, &handler_ctx[1], "GET", "/", -1);
+    create_new_request(tb, &handler_ctx[0], "GET", "/", 1);
+    create_new_request(tb, &handler_ctx[1], "GET", "/", 2);
 
     status = run_client_server_loop(tb, num_requests,
                                     handler_ctx, tb->pool);


Reply via email to