Author: rhuijben
Date: Mon Nov 16 10:32:49 2015
New Revision: 1714539

URL: http://svn.apache.org/viewvc?rev=1714539&view=rev
Log:
Add a basic 'incoming request bucket' that can read a HTTP/1 style requests
as it would be received on the server. This should be the 100% opposite of
the current request bucket, but currently it is still a bit limited.

(The next commit will actually use this on the server side).

* buckets/request_buckets.c
  (incoming_rq_status_t): New enum.
  (incoming_request_context_t): New struct.

  (serf_bucket_incoming_request_create,
   serf_incoming_rq_parse_rqline,
   serf_incoming_rq_parse_headerline,
   serf_incoming_rq_wait_for,
   serf_incoming_rq_read,
   serf_incoming_rq_readline,
   serf_incoming_rq_read_iovec,
   serf_incoming_rq_peek,
   serf_incoming_rq_destroy,
   serf_bucket_incoming_request_read,
   serf_bucket_incoming_request_wait_for_headers): New function.
  (serf_bucket_type_incoming_request): New bucket type.

* serf_bucket_types.h
  (serf_bucket_type_incoming_request): New constant.
  (SERF_BUCKET_IS_INCOMING_REQUEST): New define.
  (serf_bucket_incoming_request_create,
   serf_bucket_incoming_request_read,
   serf_bucket_incoming_request_wait_for_headers): New function.

Modified:
    serf/trunk/buckets/request_buckets.c
    serf/trunk/serf_bucket_types.h

Modified: serf/trunk/buckets/request_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/request_buckets.c?rev=1714539&r1=1714538&r2=1714539&view=diff
==============================================================================
--- serf/trunk/buckets/request_buckets.c (original)
+++ serf/trunk/buckets/request_buckets.c Mon Nov 16 10:32:49 2015
@@ -276,3 +276,351 @@ const serf_bucket_type_t serf_bucket_typ
     serf_request_set_config,
 };
 
+typedef enum incoming_rq_status_t
+{
+    STATE_INIT,
+    STATE_HEADERS,
+    STATE_PREBODY,
+    STATE_BODY,
+    STATE_TRAILERS,
+    STATE_DONE
+} incoming_rq_status_t;
+
+typedef struct incoming_request_context_t {
+    const char *method;
+    const char *path_raw;
+    int version;
+
+    serf_bucket_t *stream;
+    serf_bucket_t *headers;
+    serf_bucket_t *body;
+
+    incoming_rq_status_t state;
+    bool expect_trailers;
+
+    /* Buffer for accumulating a line from the response. */
+    serf_linebuf_t linebuf;
+
+} incoming_request_context_t;
+
+serf_bucket_t *serf_bucket_incoming_request_create(
+                      serf_bucket_t *stream,
+                      serf_bucket_alloc_t *allocator)
+{
+    incoming_request_context_t *ctx;
+
+    ctx = serf_bucket_mem_calloc(allocator, sizeof(*ctx));
+
+    ctx->stream = stream;
+    ctx->state = STATE_INIT;
+    ctx->headers = serf_bucket_headers_create(allocator);
+    serf_linebuf_init(&ctx->linebuf);
+
+    return serf_bucket_create(&serf_bucket_type_incoming_request,
+                              allocator, ctx);
+}
+
+static apr_status_t serf_incoming_rq_parse_rqline(serf_bucket_t *bucket)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    const char *spc, *spc2;
+
+    if (ctx->linebuf.used == 0) {
+        return SERF_ERROR_TRUNCATED_STREAM;
+    }
+
+    /* ### This may need some security review if this is used in production
+            code */
+    spc = memchr(ctx->linebuf.line, ' ', ctx->linebuf.used);
+
+    if (spc)
+        ctx->method = serf_bstrmemdup(bucket->allocator, ctx->linebuf.line,
+                                      spc - ctx->linebuf.line);
+    else
+        return SERF_ERROR_TRUNCATED_STREAM;
+
+    spc2 = memchr(spc + 1, ' ', ctx->linebuf.used - (ctx->linebuf.line - spc)
+                                - 1);
+
+    if (spc2)
+        ctx->path_raw = serf_bstrmemdup(bucket->allocator, spc + 1,
+                                        (spc2 - spc-1));
+    else
+        return SERF_ERROR_TRUNCATED_STREAM;
+
+    ctx->version = SERF_HTTP_11; /* ### Parse! */
+    ctx->state++;
+
+    return APR_SUCCESS;
+}
+
+static apr_status_t serf_incoming_rq_parse_headerline(serf_bucket_t *bucket)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    const char *split;
+
+    if (ctx->linebuf.used == 0) {
+        ctx->state++;
+        return APR_SUCCESS;
+    }
+
+    split = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
+
+    serf_bucket_headers_setx(ctx->headers,
+                             ctx->linebuf.line, (split - ctx->linebuf.line),
+                             TRUE /* copy */,
+                             split + 2,
+                             ctx->linebuf.used - (split - ctx->linebuf.line) - 
2,
+                             TRUE /* copy */);
+
+    return APR_SUCCESS;
+}
+
+static apr_status_t serf_incoming_rq_wait_for(serf_bucket_t *bucket,
+                                              incoming_rq_status_t wait_for)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    if (ctx->state == STATE_TRAILERS && wait_for == STATE_BODY) {
+        /* We are done with the body, but not with the request.
+           Can't return EOF yet */
+        wait_for = STATE_DONE;
+    }
+
+    while (ctx->state < wait_for) {
+        switch (ctx->state) {
+            case STATE_INIT:
+                status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream,
+                                            SERF_NEWLINE_ANY);
+                if (status)
+                    return status;
+
+                status = serf_incoming_rq_parse_rqline(bucket);
+                if (status)
+                    return status;
+                break;
+            case STATE_HEADERS:
+            case STATE_TRAILERS:
+                status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream,
+                                            SERF_NEWLINE_ANY);
+                if (status)
+                    return status;
+
+                status = serf_incoming_rq_parse_headerline(bucket);
+                if (status)
+                    return status;
+                break;
+            case STATE_PREBODY:
+                /* TODO: Determine the body type.. Wrap bucket if necessary,
+                         etc.*/
+
+                /* What kind of body do we expect */
+                {
+                    const char *te;
+
+                    ctx->body = ctx->stream;
+                    te = serf_bucket_headers_get(ctx->headers, 
"Transfer-Encoding");
+
+                    if (te && strcasecmp(te, "chunked") == 0) {
+                        ctx->body = serf_bucket_dechunk_create(ctx->stream,
+                                                               
bucket->allocator);
+                        ctx->expect_trailers = true;
+                    }
+                    else {
+                        const char *cl;
+
+                        cl = serf_bucket_headers_get(ctx->headers, 
"Content-Length");
+
+                        if (cl) {
+                            apr_uint64_t length;
+                            length = apr_strtoi64(cl, NULL, 10);
+                            if (errno == ERANGE) {
+                                return APR_FROM_OS_ERROR(ERANGE);
+                            }
+                            ctx->body = serf_bucket_response_body_create(
+                                          ctx->body, length, 
bucket->allocator);
+                        }
+                    }
+                    ctx->state++;
+                }
+                break;
+            case STATE_DONE:
+                break;
+            default:
+                return APR_EGENERAL; /* Should never happen */
+        }
+    }
+
+    return (ctx->state == STATE_DONE) ? APR_EOF : APR_SUCCESS;
+}
+
+static apr_status_t serf_incoming_rq_read(serf_bucket_t *bucket,
+                                          apr_size_t requested,
+                                          const char **data,
+                                          apr_size_t *len)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+    if (status || !ctx->body) {
+        *len = 0;
+        return status ? status : APR_EOF;
+    }
+
+    status = serf_bucket_read(ctx->body, requested, data, len);
+    if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+        ctx->state = STATE_TRAILERS;
+        status = APR_SUCCESS;
+    }
+    return status;
+}
+
+static apr_status_t serf_incoming_rq_readline(serf_bucket_t *bucket, int 
acceptable,
+                                              int *found,
+                                              const char **data, apr_size_t 
*len)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+    if (status || !ctx->body) {
+        *found = 0;
+        *len = 0;
+        return status ? status : APR_EOF;
+    }
+
+    status = serf_bucket_readline(ctx->body, acceptable, found, data, len);
+    if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+        ctx->state = STATE_TRAILERS;
+        status = APR_SUCCESS;
+    }
+    return status;
+}
+
+static apr_status_t serf_incoming_rq_read_iovec(serf_bucket_t *bucket,
+                                                apr_size_t requested,
+                                                int vecs_size,
+                                                struct iovec *vecs,
+                                                int *vecs_used)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+    if (status || !ctx->body) {
+        *vecs_used = 0;
+        return status ? status : APR_EOF;
+    }
+
+    status = serf_bucket_read_iovec(ctx->body, requested, vecs_size,
+                                    vecs, vecs_used);
+    if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+        ctx->state = STATE_TRAILERS;
+        status = APR_SUCCESS;
+    }
+    return status;
+}
+
+static apr_status_t serf_incoming_rq_peek(serf_bucket_t *bucket,
+                                          const char **data,
+                                          apr_size_t *len)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+    if (status || !ctx->body) {
+        *len = 0;
+
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+        else if (APR_STATUS_IS_EOF(status))
+            return SERF_ERROR_TRUNCATED_STREAM;
+
+        return status ? APR_SUCCESS : APR_EOF;
+    }
+
+    status = serf_bucket_peek(ctx->body, data, len);
+    if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+        ctx->state = STATE_TRAILERS;
+        status = APR_SUCCESS;
+    }
+    return status;
+}
+
+static void serf_incoming_rq_destroy(serf_bucket_t *bucket)
+{
+    incoming_request_context_t *ctx = bucket->data;
+
+    if (ctx->method)
+        serf_bucket_mem_free(bucket->allocator, (void*)ctx->method);
+    if (ctx->path_raw)
+        serf_bucket_mem_free(bucket->allocator, (void*)ctx->path_raw);
+    if (ctx->headers)
+        serf_bucket_destroy(ctx->headers);
+    if (ctx->body)
+        serf_bucket_destroy(ctx->body);
+    else if (ctx->stream)
+      serf_bucket_destroy(ctx->stream);
+
+    serf_default_destroy_and_data(bucket);
+}
+
+apr_status_t serf_bucket_incoming_request_read(
+                  serf_bucket_t **headers,
+                  const char **method,
+                  const char **path,
+                  int *http_version,
+                  serf_bucket_t *bucket)
+{
+    incoming_request_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+    if (status) {
+        if (headers)
+            *headers = NULL;
+        if (method)
+            *method = NULL;
+        if (path)
+            *path = NULL;
+        if (http_version)
+            *http_version = 0;
+
+        return status;
+    }
+
+    if (headers)
+        *headers = ctx->headers;
+    if (method)
+        *method = ctx->method;
+    if (path)
+        *path = ctx->path_raw;
+    if (http_version)
+        *http_version = ctx->version;
+
+    return APR_SUCCESS;
+}
+
+apr_status_t serf_bucket_incoming_request_wait_for_headers(
+                  serf_bucket_t *bucket)
+{
+    return serf_incoming_rq_wait_for(bucket, STATE_BODY);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_incoming_request = {
+    "INCOMING-REQUEST",
+    serf_incoming_rq_read,
+    serf_incoming_rq_readline,
+    serf_incoming_rq_read_iovec,
+    serf_default_read_for_sendfile,
+    serf_buckets_are_v2,
+    serf_incoming_rq_peek,
+    serf_incoming_rq_destroy,
+    serf_default_read_bucket,
+    serf_default_get_remaining,
+    serf_default_ignore_config
+};

Modified: serf/trunk/serf_bucket_types.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_bucket_types.h?rev=1714539&r1=1714538&r2=1714539&view=diff
==============================================================================
--- serf/trunk/serf_bucket_types.h (original)
+++ serf/trunk/serf_bucket_types.h Mon Nov 16 10:32:49 2015
@@ -81,6 +81,30 @@ void serf_bucket_request_set_root(
     serf_bucket_t *bucket,
     const char *root_url);
 
+
+/* ==================================================================== */
+
+extern const serf_bucket_type_t serf_bucket_type_incoming_request;
+#define SERF_BUCKET_IS_INCOMING_REQUEST(b)                              \
+                                SERF_BUCKET_CHECK((b), incoming_request)
+
+serf_bucket_t *serf_bucket_incoming_request_create(
+                                      serf_bucket_t *body,
+                                      serf_bucket_alloc_t *allocator);
+
+/* All output arguments optional. Waits for the request line to have arrived
+   with the normal read responses. */
+/* ### Add RESULT_POOL argument? */
+apr_status_t serf_bucket_incoming_request_read(
+                                      serf_bucket_t **headers,
+                                      const char **method,
+                                      const char **path,
+                                      int *http_version,
+                                      serf_bucket_t *request);
+
+apr_status_t serf_bucket_incoming_request_wait_for_headers(
+                                      serf_bucket_t *response);
+
 /* ==================================================================== */
 
 


Reply via email to