Author: rhuijben
Date: Wed Oct 28 12:11:37 2015
New Revision: 1710998

URL: http://svn.apache.org/viewvc?rev=1710998&view=rev
Log:
Improve the http2 protocol handler's event handling by adding a bit
more abstraction on writing frames.

* protocols/http2_protocol.c
  (serf_http2_stream_t): Move the typedef to the .h file.
  (serf_http2_procotol_state_t): Rename to...
  (serf_http2_protocol_t): ... this. Move typedef to .h.
    Add connection.
  (serf__http2_protocol_init): Update init. Use new frame
    write function.
  (setup_for_http2): Use new frame write function.
  (serf_http2__enqueue_frame): New function.
  (http2_read): Move request create code to write callback. Tweak
    frame writing and logging.
  (http2_protocol_write): Move request create code here.

* protocols/http2_protocol.h
  (includes): Declare some dependencies.
  (SERF_LOGHTTP2): New define.
  (serf_http2_protocol_t,
   serf_http2_stream_t): Declare here to allow using in multiple c files.
  (serf_http2__enqueue_frame): New function.

Modified:
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1710998&r1=1710997&r2=1710998&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Wed Oct 28 12:11:37 2015
@@ -128,9 +128,9 @@ serf_bucket_create_numberv(serf_bucket_a
 }
 
 
-typedef struct serf_http2_stream_t
+struct serf_http2_stream_t
 {
-  struct serf_http2_procotol_state_t *ctx;
+  struct serf_http2_protocol_t *ctx;
 
   /* Linked list of currently existing streams */
   struct serf_http2_stream_t *next;
@@ -156,11 +156,12 @@ typedef struct serf_http2_stream_t
   } status;
 
   /* TODO: Priority, etc. */
-} serf_http2_stream_t;
+};
 
-typedef struct serf_http2_procotol_state_t
+struct serf_http2_protocol_t
 {
   apr_pool_t *pool;
+  serf_connection_t *conn;
   serf_bucket_t *ostream;
 
   serf_hpack_table_t *hpack_tbl;
@@ -179,13 +180,13 @@ typedef struct serf_http2_procotol_state
   serf_bucket_t *cur_payload;
   int in_payload;
 
-} serf_http2_procotol_state_t;
+};
 
 static apr_status_t
 http2_protocol_cleanup(void *state)
 {
   serf_connection_t *conn = state;
-  /* serf_http2_procotol_state_t *ctx = conn->protocol_baton; */
+  /* serf_http2_protocol_t *ctx = conn->protocol_baton; */
 
   conn->protocol_baton = NULL;
   return APR_SUCCESS;
@@ -193,7 +194,7 @@ http2_protocol_cleanup(void *state)
 
 void serf__http2_protocol_init(serf_connection_t *conn)
 {
-  serf_http2_procotol_state_t *ctx;
+  serf_http2_protocol_t *ctx;
   apr_pool_t *protocol_pool;
   serf_bucket_t *tmp;
 
@@ -201,6 +202,7 @@ void serf__http2_protocol_init(serf_conn
 
   ctx = apr_pcalloc(protocol_pool, sizeof(*ctx));
   ctx->pool = protocol_pool;
+  ctx->conn = conn;
   ctx->ostream = conn->ostream_tail;
   ctx->lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
   ctx->rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
@@ -242,7 +244,7 @@ void serf__http2_protocol_init(serf_conn
                                           HTTP2_DEFAULT_MAX_FRAMESIZE,
                                           NULL, NULL, conn->allocator);
 
-    serf_bucket_aggregate_append(ctx->ostream, tmp);
+    serf_http2__enqueue_frame(ctx, tmp, FALSE);
 
     /* Add 2GB - 65535 to the current window.
        (Adding 2GB -1 appears to overflow at at least one server) */
@@ -252,13 +254,13 @@ void serf__http2_protocol_init(serf_conn
                                           NULL, NULL, NULL,
                                           HTTP2_DEFAULT_MAX_FRAMESIZE,
                                           NULL, NULL, conn->allocator);
-    serf_bucket_aggregate_append(ctx->ostream, tmp);
+    serf_http2__enqueue_frame(ctx, tmp, TRUE);
   }
 }
 
 /* Creates a HTTP/2 request from a serf request */
 static apr_status_t
-setup_for_http2(serf_http2_procotol_state_t *ctx,
+setup_for_http2(serf_http2_protocol_t *ctx,
                 serf_request_t *request)
 {
   apr_status_t status;
@@ -298,41 +300,73 @@ setup_for_http2(serf_http2_procotol_stat
                                           HTTP2_DEFAULT_MAX_FRAMESIZE,
                                           NULL, NULL, request->allocator);
 
-  serf_bucket_aggregate_append(ctx->ostream, hpack);
+  serf_http2__enqueue_frame(ctx, hpack, TRUE);
 
   return APR_SUCCESS;
 }
 
-static apr_status_t
-http2_read(serf_connection_t *conn)
+apr_status_t
+serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
+                          serf_bucket_t *frame,
+                          int pump)
 {
-  serf_http2_procotol_state_t *ctx = conn->protocol_baton;
-  apr_status_t status = APR_SUCCESS;
+  apr_status_t status;
 
-  while (TRUE)
+  if (!pump && !h2->conn->dirty_conn)
     {
-      serf_request_t *request = conn->unwritten_reqs;
-      status = APR_SUCCESS;
+      const char *data;
+      apr_size_t len;
 
-      if (request)
+      /* Cheap check to see if we should request a write
+         event next time around */
+      status = serf_bucket_peek(h2->ostream, &data, &len);
+
+      if (SERF_BUCKET_READ_ERROR(status))
+        return status;
+
+      if (len == 0)
         {
-          /* Yuck.. there must be easier ways to do this, but I don't
-              want to change outgoing.c all the time just yet. */
-          conn->unwritten_reqs = request->next;
-          if (conn->unwritten_reqs_tail == request)
-            conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL;
+          h2->conn->dirty_conn = TRUE;
+          h2->conn->ctx->dirty_pollset = TRUE;
+        }
+    }
 
-          request->next = NULL;
+  serf_bucket_aggregate_append(h2->ostream, frame);
 
-          if (conn->written_reqs_tail)
-            conn->written_reqs_tail->next = request;
-          else
-            conn->written_reqs = conn->written_reqs_tail = request;
+  if (!pump)
+    return APR_SUCCESS;
 
-          status = setup_for_http2(ctx, request);
-          if (status)
-            return status;
-        }
+  /* Flush final output buffer (after ssl, etc.) */
+  status = serf__connection_flush(h2->conn, FALSE);
+  if (SERF_BUCKET_READ_ERROR(status))
+    return status;
+
+  /* Write new data to output buffer if necessary and
+     flush again */
+  if (!status)
+    status = serf__connection_flush(h2->conn, TRUE);
+
+  if (APR_STATUS_IS_EAGAIN(status))
+    {
+      h2->conn->dirty_conn = TRUE;
+      h2->conn->ctx->dirty_pollset = TRUE;
+    }
+  else if (SERF_BUCKET_READ_ERROR(status))
+    return status;
+
+  return APR_SUCCESS;
+}
+
+
+static apr_status_t
+http2_read(serf_connection_t *conn)
+{
+  serf_http2_protocol_t *ctx = conn->protocol_baton;
+  apr_status_t status = APR_SUCCESS;
+
+  while (TRUE)
+    {
+      status = APR_SUCCESS;
 
       if (ctx->cur_frame)
         {
@@ -343,14 +377,19 @@ http2_read(serf_connection_t *conn)
             {
               unsigned char flags;
               unsigned char frametype;
+              apr_int32_t streamid;
 
               status = serf__bucket_http2_unframe_read_info(ctx->cur_frame,
-                                                            NULL, &frametype,
+                                                            &streamid, 
&frametype,
                                                             &flags);
 
-              if (status)
+              if (status && !APR_STATUS_IS_EOF(status))
                 break;
 
+              serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
+                        "Start 0x%02x http2 frame on stream 0x%x, flags=0x%x, 
size=0x%x\n",
+                        (int)frametype, (int)streamid, (int)flags, 
(int)ctx->buffer_used);
+
               ctx->in_payload = TRUE;
 
               if (flags & HTTP2_FLAG_PADDED)
@@ -395,20 +434,20 @@ http2_read(serf_connection_t *conn)
               serf__bucket_http2_unframe_read_info(ctx->cur_frame,
                                                    &streamid, &frametype,
                                                    &flags);
-              serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, conn->config,
-                        "Read 0x%02x http2 frame on stream 0x%x, flags=0x%x, 
size=0x%x\n",
+              serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
+                        "Done 0x%02x http2 frame on stream 0x%x, flags=0x%x, 
size=0x%x\n",
                         (int)frametype, (int)streamid, (int)flags, 
(int)ctx->buffer_used);
 
               if (frametype == HTTP2_FRAME_TYPE_DATA
                   || frametype == HTTP2_FRAME_TYPE_HEADERS)
                 {
                   /* Ugly hack to dump body. Memory LEAK! */
-                  serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, conn->config,
+                  serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
                             "%s\n", apr_pstrmemdup(conn->pool, ctx->buffer, 
ctx->buffer_used));
                 }
 
               if (frametype == HTTP2_FRAME_TYPE_GOAWAY && conn)
-                serf__log(LOGLVL_WARNING, LOGCOMP_CONN, __FILE__, conn->config,
+                serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config,
                           "Go away reason %d: %s\n", ctx->buffer[7],
                                                      apr_pstrmemdup(conn->pool,
                                                                &ctx->buffer[8],
@@ -418,28 +457,30 @@ http2_read(serf_connection_t *conn)
               if (frametype == HTTP2_FRAME_TYPE_SETTINGS)
                 {
                   /* Always ack settings */
-                  serf_bucket_aggregate_append(
-                    ctx->ostream,
+                  serf_http2__enqueue_frame(
+                    ctx,
                     serf__bucket_http2_frame_create(
                                     NULL,
                                     HTTP2_FRAME_TYPE_SETTINGS,
                                     HTTP2_FLAG_ACK,
                                     NULL, NULL, NULL,
                                     HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                    NULL, NULL, conn->allocator));
+                                    NULL, NULL, conn->allocator),
+                    TRUE);
                 }
               else if (frametype == HTTP2_FRAME_TYPE_DATA)
                 {
                   /* Provide a bit of window space to the server after 
                      receiving data */
-                  serf_bucket_aggregate_append(
-                    ctx->ostream,
+                  serf_http2__enqueue_frame(
+                    ctx,
                     serf__bucket_http2_frame_create(
                       serf_bucket_create_numberv(conn->allocator, "4", 
(apr_int32_t)16384),
                               HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0,
                               &streamid, NULL, NULL,
                               HTTP2_DEFAULT_MAX_FRAMESIZE,
-                              NULL, NULL, conn->allocator));
+                              NULL, NULL, conn->allocator),
+                    TRUE);
                 }
               else if (frametype == HTTP2_FRAME_TYPE_PING)
                 {
@@ -514,9 +555,30 @@ http2_protocol_read(serf_connection_t *c
 static apr_status_t
 http2_protocol_write(serf_connection_t *conn)
 {
-  serf_http2_procotol_state_t *ctx = conn->protocol_baton;
+  serf_http2_protocol_t *ctx = conn->protocol_baton;
+  serf_request_t *request = conn->unwritten_reqs;
   apr_status_t status;
 
+  if (request)
+    {
+      /* Yuck.. there must be easier ways to do this, but I don't
+          want to change outgoing.c all the time just yet. */
+      conn->unwritten_reqs = request->next;
+      if (conn->unwritten_reqs_tail == request)
+        conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL;
+
+      request->next = NULL;
+
+      if (conn->written_reqs_tail)
+        conn->written_reqs_tail->next = request;
+      else
+        conn->written_reqs = conn->written_reqs_tail = request;
+
+      status = setup_for_http2(ctx, request);
+      if (status)
+        return status;
+    }
+
   status = serf__connection_flush(conn, TRUE);
 
   if (APR_STATUS_IS_EAGAIN(status))
@@ -534,7 +596,7 @@ http2_protocol_write(serf_connection_t *
 static apr_status_t
 http2_protocol_hangup(serf_connection_t *conn)
 {
-  /* serf_http2_procotol_state_t *ctx = conn->protocol_baton; */
+  /* serf_http2_protocol_t *ctx = conn->protocol_baton; */
 
   return APR_EGENERAL;
 }
@@ -542,7 +604,7 @@ http2_protocol_hangup(serf_connection_t
 static void
 http2_protocol_teardown(serf_connection_t *conn)
 {
-  serf_http2_procotol_state_t *ctx = conn->protocol_baton;
+  serf_http2_protocol_t *ctx = conn->protocol_baton;
 
   apr_pool_destroy(ctx->pool);
   conn->protocol_baton = NULL;

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1710998&r1=1710997&r2=1710998&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Wed Oct 28 12:11:37 2015
@@ -21,6 +21,11 @@
 #ifndef SERF_PROTOCOL_HTTP2_PRIVATE_H
 #define SERF_PROTOCOL_HTTP2_PRIVATE_H
 
+#include "serf.h"
+#include "serf_private.h"
+
+#define SERF_LOGHTTP2 \
+    SERF_LOGCOMP_PROTOCOL, (__FILE__ ":" APR_STRINGIFY(__LINE__))
 
 #ifdef __cplusplus
 extern "C" {
@@ -79,6 +84,16 @@ extern "C" {
 #define HTTP2_CONNECTION_PREFIX "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
 
 
+
+/* ------------------------------------- */
+typedef struct serf_http2_protocol_t serf_http2_protocol_t;
+typedef struct serf_http2_stream_t serf_http2_stream_t;
+
+apr_status_t
+serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
+                          serf_bucket_t *frame,
+                          int pump);
+
 #ifdef __cplusplus
 }
 #endif


Reply via email to