Author: rhuijben
Date: Mon Nov 16 12:44:14 2015
New Revision: 1714582

URL: http://svn.apache.org/viewvc?rev=1714582&view=rev
Log:
On incoming connections: implement switching to the http2 protocol. This adds
the protocol switching to the connection and http2 protocol, but doesn't add
request routing for http2 on the server side yet.

Add a test that excercises this code, but currently expect a failure to
handle the actual request.

* incoming.c
  (client_connected): Make the peeking optional.
  (perform_peek_protocol): Use a slightly larger prefix. Switch protocol.
  (read_from_client): Do the right thing if protocol was switched.

  (serf__client_flush): Rename to...
  (serf__incoming_client_flush): ... this and make available within serf.
  (write_to_client): Update caller.
  (hangup_client,
   serf_incoming_set_framing_type): New function.
  (serf__process_client): Delegate work to protocol specific callbacks.
  (serf_incoming_create2): Initialize protocol support.

* protocols/http2_protocol.c
  (http2_protocol_read,
   http2_protocol_write,
   http2_protocol_hangup,
   http2_protocol_teardown): Rename to...
  (http2_outgoing_read,
   http2_outgoing_write,
   http2_outgoing_hangup,
   http2_outgoing_teardown): ... this.
  (http2_incoming_read,
   http2_incoming_write,
   http2_incoming_hangup,
   http2_incoming_teardown): New prototypes.

  (serf_http2_protocol_t): Extend a bit.
  (http2_protocol_cleanup): Use protocol as baton.
  (serf__http2_protocol_init): Tweak init.
  (serf__http2_protocol_init_server): New function.
  (serf_http2__enqueue_frame): Support both client and server.
    Simplify assuming improved flush handling.
  (http2_process): Use h2 stream instead of only that in conn.

  (http2_protocol_read,
   http2_protocol_write,
   http2_protocol_hangup,
   http2_protocol_teardown): Rename to...
  (http2_outgoing_read,
   http2_outgoing_write,
   http2_outgoing_hangup,
   http2_outgoing_teardown): ... this.

  (http2_incoming_read,
   http2_incoming_write,
   http2_incoming_hangup,
   http2_incoming_teardown): New functions.

* serf.h
  (serf_incoming_set_framing_type): New function.

* serf_private.h
  (serf_incoming_t): Tweak varname to match outgoing name.
    Add callbacks.
  (serf__incoming_client_flush,
   serf__http2_protocol_init_server): New function.

* test/test_server.c
  (connection_setup_http2,
   test_listen_http2): New function.

Modified:
    serf/trunk/incoming.c
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/serf.h
    serf/trunk/serf_private.h
    serf/trunk/test/test_server.c

Modified: serf/trunk/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1714582&r1=1714581&r2=1714582&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Mon Nov 16 12:44:14 2015
@@ -86,12 +86,15 @@ static apr_status_t client_connected(ser
     serf_bucket_aggregate_append(client->ostream_head,
                                  ostream);
 
-    client->proto_peek_bkt = serf_bucket_aggregate_create(client->allocator);
-
-    serf_bucket_aggregate_append(
-                client->proto_peek_bkt,
-                serf_bucket_barrier_create(client->stream,
-                                           client->allocator));
+    if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) {
+        client->proto_peek_bkt = serf_bucket_aggregate_create(
+                                        client->allocator);
+
+        serf_bucket_aggregate_append(
+            client->proto_peek_bkt,
+            serf_bucket_barrier_create(client->stream,
+                                       client->allocator));
+    }
 
     return status;
 }
@@ -161,8 +164,8 @@ apr_status_t serf_incoming_response_crea
 
 apr_status_t perform_peek_protocol(serf_incoming_t *client)
 {
-    const char h2prefix[] = "PRI * HTTP/2.0\r\n";
-    const apr_size_t h2prefixlen = 16;
+    const char h2prefix[] = "PRI * HTTP/2.0\r\n\r\n";
+    const apr_size_t h2prefixlen = sizeof(h2prefix) - 1;
     const char *data;
     apr_size_t len;
 
@@ -183,6 +186,8 @@ apr_status_t perform_peek_protocol(serf_
 
         if (len && memcmp(data, h2prefix, len) != 0) {
             /* This is not HTTP/2 */
+            serf_incoming_set_framing_type(client,
+                                           SERF_CONNECTION_FRAMING_TYPE_HTTP1);
 
             /* Easy out */
             serf_bucket_destroy(client->proto_peek_bkt);
@@ -192,7 +197,8 @@ apr_status_t perform_peek_protocol(serf_
         }
         else if (len == h2prefixlen) {
             /* We have HTTP/2 */
-            client->framing = SERF_CONNECTION_FRAMING_TYPE_HTTP2;
+            serf_incoming_set_framing_type(client,
+                                           SERF_CONNECTION_FRAMING_TYPE_HTTP2);
 
             serf_bucket_destroy(client->proto_peek_bkt);
             client->proto_peek_bkt = NULL;
@@ -218,6 +224,8 @@ apr_status_t perform_peek_protocol(serf_
 
         if (len && memcmp(data, h2prefix, len)) {
             /* This is not HTTP/2 */
+            serf_incoming_set_framing_type(client,
+                                           SERF_CONNECTION_FRAMING_TYPE_HTTP1);
 
             /* Put data ahead of other data and do the usual thing */
             serf_bucket_aggregate_prepend(client->proto_peek_bkt,
@@ -230,7 +238,8 @@ apr_status_t perform_peek_protocol(serf_
         }
         else if (len == h2prefixlen) {
             /* We have HTTP/2 */
-            client->framing = SERF_CONNECTION_FRAMING_TYPE_HTTP2;
+            serf_incoming_set_framing_type(client,
+                                           SERF_CONNECTION_FRAMING_TYPE_HTTP2);
 
             /* Put data ahead of other data and do the usual thing */
             serf_bucket_aggregate_prepend(client->proto_peek_bkt,
@@ -256,6 +265,10 @@ static apr_status_t read_from_client(ser
         status = perform_peek_protocol(client);
         if (status)
             return status;
+
+        /* Did we switch protocol? */
+        if (client->perform_read != read_from_client)
+            return client->perform_read(client);
     }
 
     do {
@@ -386,8 +399,8 @@ static apr_status_t no_more_writes(serf_
   return APR_SUCCESS;
 }
 
-static apr_status_t serf__client_flush(serf_incoming_t *client,
-                                       bool pump)
+apr_status_t serf__incoming_client_flush(serf_incoming_t *client,
+                                         bool pump)
 {
     apr_status_t status = APR_SUCCESS;
     apr_status_t read_status = APR_SUCCESS;
@@ -464,7 +477,7 @@ static apr_status_t write_to_client(serf
 {
     apr_status_t status;
 
-    status = serf__client_flush(client, true);
+    status = serf__incoming_client_flush(client, true);
 
     if (APR_STATUS_IS_EAGAIN(status))
         return APR_SUCCESS;
@@ -478,6 +491,46 @@ static apr_status_t write_to_client(serf
     return APR_SUCCESS;
 }
 
+static apr_status_t hangup_client(serf_incoming_t *client)
+{
+    return APR_ECONNRESET;
+}
+
+
+void serf_incoming_set_framing_type(
+    serf_incoming_t *client,
+    serf_connection_framing_type_t framing_type)
+{
+    client->framing_type = framing_type;
+
+    if (client->skt) {
+        client->dirty_conn = true;
+        client->ctx->dirty_pollset = true;
+        client->stop_writing = 0;
+
+        /* Close down existing protocol */
+        if (client->protocol_baton && client->perform_teardown) {
+            client->perform_teardown(client);
+            client->protocol_baton = NULL;
+        }
+
+        /* Reset to default */
+        client->perform_read = read_from_client;
+        client->perform_write = write_to_client;
+        client->perform_hangup = hangup_client;
+        client->perform_teardown = NULL;
+
+        switch (framing_type) {
+            case SERF_CONNECTION_FRAMING_TYPE_HTTP2:
+                serf__http2_protocol_init_server(client);
+                break;
+            default:
+                break;
+        }
+    }
+}
+
+
 apr_status_t serf__process_client(serf_incoming_t *client, apr_int16_t events)
 {
     apr_status_t rv;
@@ -491,14 +544,17 @@ apr_status_t serf__process_client(serf_i
     }
 
     if ((events & APR_POLLIN) != 0) {
-        rv = read_from_client(client);
+        rv = client->perform_read(client);
         if (rv) {
             return rv;
         }
     }
 
     if ((events & APR_POLLHUP) != 0) {
-        return APR_ECONNRESET;
+        rv = client->perform_hangup(client);
+        if (rv) {
+            return rv;
+        }
     }
 
     if ((events & APR_POLLERR) != 0) {
@@ -506,7 +562,7 @@ apr_status_t serf__process_client(serf_i
     }
 
     if ((events & APR_POLLOUT) != 0) {
-        rv = write_to_client(client);
+        rv = client->perform_write(client);
         if (rv) {
             return rv;
         }
@@ -591,6 +647,8 @@ apr_status_t serf_incoming_create2(
     ic->dirty_conn = false;
     ic->wait_for_connect = true;
     ic->vec_len = 0;
+    /* Detect HTTP 1 or 2 via peek operation */
+    ic->framing_type = SERF_CONNECTION_FRAMING_TYPE_NONE;
 
     ic->setup = setup;
     ic->setup_baton = setup_baton;
@@ -604,6 +662,10 @@ apr_status_t serf_incoming_create2(
     ic->ssltunnel_ostream = NULL;
 
     ic->protocol_baton = NULL;
+    ic->perform_read = read_from_client;
+    ic->perform_write = write_to_client;
+    ic->perform_hangup = hangup_client;
+    ic->perform_teardown = NULL;
     ic->current_request = NULL;
 
     ic->desc.desc_type = APR_POLL_SOCKET;

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1714582&r1=1714581&r2=1714582&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Mon Nov 16 12:44:14 2015
@@ -32,16 +32,28 @@
 #include "protocols/http2_protocol.h"
 
 static apr_status_t
-http2_protocol_read(serf_connection_t *conn);
+http2_outgoing_read(serf_connection_t *conn);
 
 static apr_status_t
-http2_protocol_write(serf_connection_t *conn);
+http2_outgoing_write(serf_connection_t *conn);
 
 static apr_status_t
-http2_protocol_hangup(serf_connection_t *conn);
+http2_outgoing_hangup(serf_connection_t *conn);
 
 static void
-http2_protocol_teardown(serf_connection_t *conn);
+http2_outgoing_teardown(serf_connection_t *conn);
+
+static apr_status_t
+http2_incoming_read(serf_incoming_t *client);
+
+static apr_status_t
+http2_incoming_write(serf_incoming_t *client);
+
+static apr_status_t
+http2_incoming_hangup(serf_incoming_t *client);
+
+static void
+http2_incoming_teardown(serf_incoming_t *conn);
 
 static apr_status_t
 http2_process(serf_http2_protocol_t *h2);
@@ -105,14 +117,16 @@ serf_bucket_create_numberv(serf_bucket_a
 struct serf_http2_protocol_t
 {
   apr_pool_t *pool;
-  serf_connection_t *conn;
-  serf_bucket_t *ostream;
+  serf_connection_t *conn; /* Either CONN or CLIENT is set */
+  serf_incoming_t *client;
+  serf_bucket_t *stream, *ostream;
   serf_bucket_alloc_t *allocator;
 
   serf_http2_processor_t processor;
   void *processor_baton;
   serf_bucket_t *read_frame;   /* Frame currently being read */
-  int in_frame;
+  bool in_frame;
+  apr_size_t prefix_left;
 
   serf_hpack_table_t *hpack_tbl;
   serf_config_t *config;
@@ -125,7 +139,7 @@ struct serf_http2_protocol_t
   apr_uint32_t lr_max_concurrent;
   apr_uint32_t lr_hpack_table_size;
   apr_int32_t lr_next_streamid;
-  char lr_push_enabled;
+  bool lr_push_enabled;
 
   /* Remote -> Local. Settings set by us. Acknowledged by other side */
   apr_uint32_t rl_default_window;
@@ -135,13 +149,13 @@ struct serf_http2_protocol_t
   apr_uint32_t rl_max_concurrent;
   apr_uint32_t rl_hpack_table_size;
   apr_int32_t rl_next_streamid;
-  char rl_push_enabled;
+  bool rl_push_enabled;
 
   serf_http2_stream_t *first;
   serf_http2_stream_t *last;
 
   int setting_acks;
-  int enforce_flow_control;
+  bool enforce_flow_control;
 
   serf_bucket_t *continuation_bucket;
   apr_int32_t continuation_streamid;
@@ -157,8 +171,9 @@ http2_bucket_processor(void *baton,
 static apr_status_t
 http2_protocol_cleanup(void *state)
 {
-  serf_connection_t *conn = state;
-  serf_http2_protocol_t *h2 = conn->protocol_baton;
+  serf_http2_protocol_t *h2 = state;
+  serf_connection_t *conn = h2->conn;
+  serf_incoming_t *client = h2->client;
   serf_http2_stream_t *stream, *next;
 
   /* First clean out all streams */
@@ -183,7 +198,7 @@ http2_protocol_cleanup(void *state)
 
           h2->processor = NULL;
           h2->processor_baton = NULL;
-          
+
         }
       /* Else: The processor (probably a stream)
                needs to handle this. It usually does that
@@ -197,7 +212,11 @@ http2_protocol_cleanup(void *state)
     }
   h2->in_frame = FALSE;
 
-  conn->protocol_baton = NULL;
+  if (conn)
+      conn->protocol_baton = NULL;
+  if (client)
+      client->protocol_baton = NULL;
+
   return APR_SUCCESS;
 }
 
@@ -206,13 +225,14 @@ void serf__http2_protocol_init(serf_conn
   serf_http2_protocol_t *h2;
   apr_pool_t *protocol_pool;
   serf_bucket_t *tmp;
-  const int WE_ARE_CLIENT = 1;
+  const bool WE_ARE_CLIENT = true;
 
   apr_pool_create(&protocol_pool, conn->pool);
 
   h2 = apr_pcalloc(protocol_pool, sizeof(*h2));
   h2->pool = protocol_pool;
   h2->conn = conn;
+  h2->stream = conn->stream;
   h2->ostream = conn->ostream_tail;
   h2->allocator = conn->allocator;
   h2->config = conn->config;
@@ -247,13 +267,13 @@ void serf__http2_protocol_init(serf_conn
                                            HTTP2_DEFAULT_HPACK_TABLE_SIZE,
                                            protocol_pool);
 
-  apr_pool_cleanup_register(protocol_pool, conn, http2_protocol_cleanup,
+  apr_pool_cleanup_register(protocol_pool, h2, http2_protocol_cleanup,
                             apr_pool_cleanup_null);
 
-  conn->perform_read = http2_protocol_read;
-  conn->perform_write = http2_protocol_write;
-  conn->perform_hangup = http2_protocol_hangup;
-  conn->perform_teardown = http2_protocol_teardown;
+  conn->perform_read = http2_outgoing_read;
+  conn->perform_write = http2_outgoing_write;
+  conn->perform_hangup = http2_outgoing_hangup;
+  conn->perform_teardown = http2_outgoing_teardown;
   conn->protocol_baton = h2;
 
   /* Disable HTTP/1.1 guessing that affects writability */
@@ -261,8 +281,7 @@ void serf__http2_protocol_init(serf_conn
   conn->max_outstanding_requests = 0;
 
   /* Send the HTTP/2 Connection Preface */
-  tmp = SERF_BUCKET_SIMPLE_STRING("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
-                                  conn->allocator);
+  tmp = SERF_BUCKET_SIMPLE_STRING(HTTP2_CONNECTION_PREFIX, h2->allocator);
   serf_bucket_aggregate_append(h2->ostream, tmp);
 
   /* And now a settings frame and a huge window */
@@ -272,7 +291,7 @@ void serf__http2_protocol_init(serf_conn
     tmp = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_SETTINGS, 0,
                                           NULL, NULL, NULL, /* stream: 0 */
                                           h2->lr_max_framesize,
-                                          conn->allocator);
+                                          h2->allocator);
 
     serf_http2__enqueue_frame(h2, tmp, FALSE);
 
@@ -282,13 +301,95 @@ void serf__http2_protocol_init(serf_conn
                                           HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0,
                                           NULL, NULL, NULL, /* stream: 0 */
                                           h2->lr_max_framesize,
-                                          conn->allocator);
+                                          h2->allocator);
     serf_http2__enqueue_frame(h2, tmp, FALSE);
 
     h2->rl_window += 0x40000000; /* And update our own administration */
   }
 }
 
+void serf__http2_protocol_init_server(serf_incoming_t *client)
+{
+    serf_http2_protocol_t *h2;
+    apr_pool_t *protocol_pool;
+    serf_bucket_t *tmp;
+    const int WE_ARE_CLIENT = false;
+
+    apr_pool_create(&protocol_pool, client->pool);
+
+    h2 = apr_pcalloc(protocol_pool, sizeof(*h2));
+    h2->pool = protocol_pool;
+    h2->client = client;
+    h2->stream = client->stream;
+    h2->ostream = client->ostream_tail;
+    h2->allocator = client->allocator;
+    h2->config = client->config;
+
+    h2->prefix_left = sizeof(HTTP2_CONNECTION_PREFIX) - 1;
+
+    /* Defaults until negotiated */
+    h2->rl_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
+    h2->rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
+    h2->rl_next_streamid = WE_ARE_CLIENT ? 2 : 1;
+    h2->rl_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
+    h2->rl_max_headersize = APR_UINT32_MAX;
+    h2->rl_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
+    h2->rl_hpack_table_size = HTTP2_DEFAULT_HPACK_TABLE_SIZE;
+    h2->rl_push_enabled = TRUE;
+
+    h2->lr_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
+    h2->lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
+    h2->lr_next_streamid = WE_ARE_CLIENT ? 1 : 2;
+    h2->lr_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
+    h2->lr_max_headersize = APR_UINT32_MAX;
+    h2->lr_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
+    h2->lr_hpack_table_size = HTTP2_DEFAULT_HPACK_TABLE_SIZE;
+    h2->lr_push_enabled = TRUE;
+
+    h2->setting_acks = 0;
+    h2->enforce_flow_control = TRUE;
+    h2->continuation_bucket = NULL;
+    h2->continuation_streamid = 0;
+
+    h2->first = h2->last = NULL;
+
+    h2->hpack_tbl = serf__hpack_table_create(TRUE,
+                                             HTTP2_DEFAULT_HPACK_TABLE_SIZE,
+                                             protocol_pool);
+
+    apr_pool_cleanup_register(protocol_pool, h2, http2_protocol_cleanup,
+                              apr_pool_cleanup_null);
+
+    client->perform_read = http2_incoming_read;
+    client->perform_write = http2_incoming_write;
+    client->perform_hangup = http2_incoming_hangup;
+    client->perform_teardown = http2_incoming_teardown;
+    client->protocol_baton = h2;
+
+    /* Send a settings frame and a huge window */
+    {
+        serf_bucket_t *window_size;
+
+        tmp = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_SETTINGS, 
0,
+                                              NULL, NULL, NULL, /* stream: 0 */
+                                              h2->lr_max_framesize,
+                                              h2->allocator);
+
+        serf_http2__enqueue_frame(h2, tmp, FALSE);
+
+        /* Add 1GB to the current window. */
+        window_size = serf_bucket_create_numberv(h2->allocator, "4", 
0x40000000);
+        tmp = serf__bucket_http2_frame_create(window_size,
+                                              HTTP2_FRAME_TYPE_WINDOW_UPDATE, 
0,
+                                              NULL, NULL, NULL, /* stream: 0 */
+                                              h2->lr_max_framesize,
+                                              h2->allocator);
+        serf_http2__enqueue_frame(h2, tmp, FALSE);
+
+        h2->rl_window += 0x40000000; /* And update our own administration */
+    }
+}
+
 /* Creates a HTTP/2 request from a serf request */
 static apr_status_t
 enqueue_http2_request(serf_http2_protocol_t *h2)
@@ -321,7 +422,9 @@ serf_http2__enqueue_frame(serf_http2_pro
 {
   apr_status_t status;
 
-  if (!pump && !h2->conn->dirty_conn)
+  if (!pump
+      && !((h2->conn && h2->conn->dirty_conn)
+           || (h2->client && h2->client->dirty_conn)))
     {
       const char *data;
       apr_size_t len;
@@ -333,7 +436,7 @@ serf_http2__enqueue_frame(serf_http2_pro
       if (SERF_BUCKET_READ_ERROR(status))
         {
           serf_bucket_destroy(frame);
-          return status; 
+          return status;
         }
 
       if (len == 0)
@@ -349,22 +452,24 @@ serf_http2__enqueue_frame(serf_http2_pro
     return APR_SUCCESS;
 
   /* Flush final output buffer (after ssl, etc.) */
-  status = serf__connection_flush(h2->conn, FALSE);
-  if (SERF_BUCKET_READ_ERROR(status))
-    return status;
-
-  /* Write new data to output buffer if necessary and
-     flush again */
-  if (!status)
+  if (h2->conn)
     status = serf__connection_flush(h2->conn, TRUE);
+  else
+    status = serf__incoming_client_flush(h2->client, TRUE);
 
   if (APR_STATUS_IS_EAGAIN(status))
-    {
-      h2->conn->dirty_conn = TRUE;
-      h2->conn->ctx->dirty_pollset = TRUE;
-    }
-  else if (SERF_BUCKET_READ_ERROR(status))
-    return status;
+      return APR_SUCCESS;
+  else if (status)
+      return status;
+
+  if (h2->conn) {
+    h2->conn->dirty_conn = true;
+    h2->conn->ctx->dirty_pollset = true;
+  }
+  else {
+    h2->client->dirty_conn = true;
+    h2->client->ctx->dirty_pollset = true;
+  }
 
   return APR_SUCCESS;
 }
@@ -960,7 +1065,7 @@ http2_process(serf_http2_protocol_t *h2)
           SERF_H2_assert(!h2->in_frame);
 
           body = serf__bucket_http2_unframe_create(
-                                             h2->conn->stream,
+                                             h2->stream,
                                              h2->rl_max_framesize,
                                              h2->allocator);
 
@@ -1459,7 +1564,7 @@ http2_process(serf_http2_protocol_t *h2)
 }
 
 static apr_status_t
-http2_protocol_read(serf_connection_t *conn)
+http2_outgoing_read(serf_connection_t *conn)
 {
   apr_status_t status;
 
@@ -1492,7 +1597,7 @@ http2_protocol_read(serf_connection_t *c
 }
 
 static apr_status_t
-http2_protocol_write(serf_connection_t *conn)
+http2_outgoing_write(serf_connection_t *conn)
 {
   serf_http2_protocol_t *h2 = conn->protocol_baton;
   apr_status_t status;
@@ -1520,7 +1625,7 @@ http2_protocol_write(serf_connection_t *
 }
 
 static apr_status_t
-http2_protocol_hangup(serf_connection_t *conn)
+http2_outgoing_hangup(serf_connection_t *conn)
 {
   /* serf_http2_protocol_t *ctx = conn->protocol_baton; */
 
@@ -1528,7 +1633,7 @@ http2_protocol_hangup(serf_connection_t
 }
 
 static void
-http2_protocol_teardown(serf_connection_t *conn)
+http2_outgoing_teardown(serf_connection_t *conn)
 {
   serf_http2_protocol_t *ctx = conn->protocol_baton;
 
@@ -1536,6 +1641,121 @@ http2_protocol_teardown(serf_connection_
   conn->protocol_baton = NULL;
 }
 
+static apr_status_t
+http2_incoming_read(serf_incoming_t *client)
+{
+    apr_status_t status;
+    serf_http2_protocol_t *h2 = client->protocol_baton;
+
+    /* If the stop_writing flag was set on the connection, reset it now because
+    there is some data to read. */
+    if (client->stop_writing)
+    {
+        client->stop_writing = 0;
+        client->dirty_conn = 1;
+        client->ctx->dirty_pollset = 1;
+    }
+
+    if (h2->prefix_left) {
+        serf_bucket_t *stream;
+
+        if (client->proto_peek_bkt)
+            stream = client->proto_peek_bkt;
+        else
+            stream = client->stream;
+
+        do {
+            const char *data;
+            apr_size_t len;
+
+            status = serf_bucket_read(stream, h2->prefix_left,
+                                      &data, &len);
+
+            if (!SERF_BUCKET_READ_ERROR(status)) {
+                if (len && memcmp(data,
+                                  HTTP2_CONNECTION_PREFIX - h2->prefix_left - 1
+                                  + sizeof(HTTP2_CONNECTION_PREFIX),
+                                  len) != 0)
+                {
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                }
+                h2->prefix_left -= len;
+            }
+        } while (status == APR_SUCCESS && h2->prefix_left);
+
+        if (!h2->prefix_left && client->proto_peek_bkt) {
+            /* Peek buffer is now empty. Use actual stream */
+            serf_bucket_destroy(client->proto_peek_bkt);
+            client->proto_peek_bkt = NULL;
+
+            h2->stream = client->stream;
+        }
+
+        if (APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN)
+        {
+            return APR_SUCCESS;
+        }
+        else if (status) {
+            return status;
+        }
+    }
+
+    status = http2_process(h2);
+
+    if (!status)
+        return APR_SUCCESS;
+    else if (APR_STATUS_IS_EOF(status))
+    {
+        /* TODO: Teardown connection, reset if necessary, etc. */
+        return status;
+    }
+    else if (APR_STATUS_IS_EAGAIN(status)
+             || status == SERF_ERROR_WAIT_CONN)
+    {
+        /* Update pollset, etc. etc. */
+        return APR_SUCCESS;
+    }
+    else
+        return status;
+}
+
+static apr_status_t
+http2_incoming_write(serf_incoming_t *client)
+{
+    /* serf_http2_protocol_t *h2 = client->protocol_baton; */
+    apr_status_t status;
+
+    status = serf__incoming_client_flush(client, TRUE);
+
+    if (APR_STATUS_IS_EAGAIN(status))
+        return APR_SUCCESS;
+    else if (status)
+        return status;
+
+    /* Probably nothing to write. Connection will check new requests */
+    client->dirty_conn = true;
+    client->ctx->dirty_pollset = true;
+
+    return APR_SUCCESS;
+}
+
+static apr_status_t
+http2_incoming_hangup(serf_incoming_t *client)
+{
+    /* serf_http2_protocol_t *ctx = conn->protocol_baton; */
+
+    return APR_EGENERAL;
+}
+
+static void
+http2_incoming_teardown(serf_incoming_t *client)
+{
+    serf_http2_protocol_t *ctx = client->protocol_baton;
+
+    apr_pool_destroy(ctx->pool);
+    client->protocol_baton = NULL;
+}
+
 void
 serf_http2__allocate_stream_id(void *baton,
                                apr_int32_t *streamid)

Modified: serf/trunk/serf.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1714582&r1=1714581&r2=1714582&view=diff
==============================================================================
--- serf/trunk/serf.h (original)
+++ serf/trunk/serf.h Mon Nov 16 12:44:14 2015
@@ -627,6 +627,13 @@ void serf_connection_set_framing_type(
   serf_connection_framing_type_t framing_type);
 
 /**
+ * @since New in 1.4.
+ */
+void serf_incoming_set_framing_type(
+    serf_incoming_t *client,
+    serf_connection_framing_type_t framing_type);
+
+/**
  * Setup the @a request for delivery on its connection.
  *
  * Right before this is invoked, @a pool will be built within the

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714582&r1=1714581&r2=1714582&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Mon Nov 16 12:44:14 2015
@@ -368,13 +368,21 @@ struct serf_incoming_t {
     serf_incoming_closed_t closed;
     void *closed_baton;
 
-    serf_connection_framing_type_t framing;
+    serf_connection_framing_type_t framing_type;
 
     bool dirty_conn;
     bool wait_for_connect;
     bool hit_eof;
     bool stop_writing;
 
+    /* Event callbacks, called from serf__process_client() to do the actual
+    processing. */
+    apr_status_t(*perform_read)(serf_incoming_t *client);
+    apr_status_t(*perform_write)(serf_incoming_t *client);
+    apr_status_t(*perform_hangup)(serf_incoming_t *client);
+
+    /* Cleanup of protocol handling */
+    void(*perform_teardown)(serf_incoming_t *conn);
     void *protocol_baton;
 
     /* A bucket wrapped around our socket (for reading responses). */
@@ -629,6 +637,7 @@ void serf__context_progress_delta(void *
 apr_status_t serf__process_client(serf_incoming_t *l, apr_int16_t events);
 apr_status_t serf__process_listener(serf_listener_t *l);
 apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming);
+apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump);
 
 /* from outgoing.c */
 apr_status_t serf__open_connections(serf_context_t *ctx);
@@ -666,6 +675,7 @@ serf_bucket_t *serf__bucket_log_wrapper_
 
 /* From http2_protocol.c: Initializes http2 state on connection */
 void serf__http2_protocol_init(serf_connection_t *conn);
+void serf__http2_protocol_init_server(serf_incoming_t *client);
 
 typedef struct serf_hpack_table_t serf_hpack_table_t;
 

Modified: serf/trunk/test/test_server.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714582&r1=1714581&r2=1714582&view=diff
==============================================================================
--- serf/trunk/test/test_server.c (original)
+++ serf/trunk/test/test_server.c Mon Nov 16 12:44:14 2015
@@ -183,6 +183,22 @@ run_client_server_loop(test_baton_t *tb,
     return APR_SUCCESS;
 }
 
+static apr_status_t connection_setup_http2(apr_socket_t *skt,
+                                           serf_bucket_t **read_bkt,
+                                           serf_bucket_t **write_bkt,
+                                           void *setup_baton,
+                                           apr_pool_t *pool)
+{
+    test_baton_t *tb = setup_baton;
+
+    /* Would be nice to be able to call default_http_conn_setup */
+    *read_bkt = serf_bucket_socket_create(skt, tb->bkt_alloc);
+
+    serf_connection_set_framing_type(tb->connection,
+                                     SERF_CONNECTION_FRAMING_TYPE_HTTP2);
+
+    return APR_SUCCESS;
+}
 void test_listen_http(CuTest *tc)
 {
     test_baton_t *tb = tc->testBaton;
@@ -203,6 +219,27 @@ void test_listen_http(CuTest *tc)
     CuAssertIntEquals(tc, APR_SUCCESS, status);
 }
 
+void test_listen_http2(CuTest *tc)
+{
+    test_baton_t *tb = tc->testBaton;
+    apr_status_t status;
+    handler_baton_t handler_ctx[2];
+    const int num_requests = sizeof(handler_ctx) / sizeof(handler_ctx[0]);
+
+    setup_test_server(tb);
+
+    status = setup_test_client_context(tb, connection_setup_http2,
+                                       tb->pool);
+    CuAssertIntEquals(tc, APR_SUCCESS, status);
+
+    create_new_request(tb, &handler_ctx[0], "GET", "/", 1);
+    create_new_request(tb, &handler_ctx[1], "GET", "/", 2);
+
+    status = run_client_server_loop(tb, num_requests,
+                                    handler_ctx, tb->pool);
+    CuAssertIntEquals(tc, SERF_ERROR_HTTP2_PROTOCOL_ERROR, status);
+}
+
 /*****************************************************************************/
 CuSuite *test_server(void)
 {
@@ -211,6 +248,7 @@ CuSuite *test_server(void)
     CuSuiteSetSetupTeardownCallbacks(suite, test_setup, test_teardown);
 
     SUITE_ADD_TEST(suite, test_listen_http);
+    SUITE_ADD_TEST(suite, test_listen_http2);
 
     return suite;
 }


Reply via email to