Author: rhuijben Date: Mon Nov 16 10:32:49 2015 New Revision: 1714539 URL: http://svn.apache.org/viewvc?rev=1714539&view=rev Log: Add a basic 'incoming request bucket' that can read a HTTP/1 style requests as it would be received on the server. This should be the 100% opposite of the current request bucket, but currently it is still a bit limited.
(The next commit will actually use this on the server side). * buckets/request_buckets.c (incoming_rq_status_t): New enum. (incoming_request_context_t): New struct. (serf_bucket_incoming_request_create, serf_incoming_rq_parse_rqline, serf_incoming_rq_parse_headerline, serf_incoming_rq_wait_for, serf_incoming_rq_read, serf_incoming_rq_readline, serf_incoming_rq_read_iovec, serf_incoming_rq_peek, serf_incoming_rq_destroy, serf_bucket_incoming_request_read, serf_bucket_incoming_request_wait_for_headers): New function. (serf_bucket_type_incoming_request): New bucket type. * serf_bucket_types.h (serf_bucket_type_incoming_request): New constant. (SERF_BUCKET_IS_INCOMING_REQUEST): New define. (serf_bucket_incoming_request_create, serf_bucket_incoming_request_read, serf_bucket_incoming_request_wait_for_headers): New function. Modified: serf/trunk/buckets/request_buckets.c serf/trunk/serf_bucket_types.h Modified: serf/trunk/buckets/request_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/request_buckets.c?rev=1714539&r1=1714538&r2=1714539&view=diff ============================================================================== --- serf/trunk/buckets/request_buckets.c (original) +++ serf/trunk/buckets/request_buckets.c Mon Nov 16 10:32:49 2015 @@ -276,3 +276,351 @@ const serf_bucket_type_t serf_bucket_typ serf_request_set_config, }; +typedef enum incoming_rq_status_t +{ + STATE_INIT, + STATE_HEADERS, + STATE_PREBODY, + STATE_BODY, + STATE_TRAILERS, + STATE_DONE +} incoming_rq_status_t; + +typedef struct incoming_request_context_t { + const char *method; + const char *path_raw; + int version; + + serf_bucket_t *stream; + serf_bucket_t *headers; + serf_bucket_t *body; + + incoming_rq_status_t state; + bool expect_trailers; + + /* Buffer for accumulating a line from the response. */ + serf_linebuf_t linebuf; + +} incoming_request_context_t; + +serf_bucket_t *serf_bucket_incoming_request_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator) +{ + incoming_request_context_t *ctx; + + ctx = serf_bucket_mem_calloc(allocator, sizeof(*ctx)); + + ctx->stream = stream; + ctx->state = STATE_INIT; + ctx->headers = serf_bucket_headers_create(allocator); + serf_linebuf_init(&ctx->linebuf); + + return serf_bucket_create(&serf_bucket_type_incoming_request, + allocator, ctx); +} + +static apr_status_t serf_incoming_rq_parse_rqline(serf_bucket_t *bucket) +{ + incoming_request_context_t *ctx = bucket->data; + const char *spc, *spc2; + + if (ctx->linebuf.used == 0) { + return SERF_ERROR_TRUNCATED_STREAM; + } + + /* ### This may need some security review if this is used in production + code */ + spc = memchr(ctx->linebuf.line, ' ', ctx->linebuf.used); + + if (spc) + ctx->method = serf_bstrmemdup(bucket->allocator, ctx->linebuf.line, + spc - ctx->linebuf.line); + else + return SERF_ERROR_TRUNCATED_STREAM; + + spc2 = memchr(spc + 1, ' ', ctx->linebuf.used - (ctx->linebuf.line - spc) + - 1); + + if (spc2) + ctx->path_raw = serf_bstrmemdup(bucket->allocator, spc + 1, + (spc2 - spc-1)); + else + return SERF_ERROR_TRUNCATED_STREAM; + + ctx->version = SERF_HTTP_11; /* ### Parse! */ + ctx->state++; + + return APR_SUCCESS; +} + +static apr_status_t serf_incoming_rq_parse_headerline(serf_bucket_t *bucket) +{ + incoming_request_context_t *ctx = bucket->data; + const char *split; + + if (ctx->linebuf.used == 0) { + ctx->state++; + return APR_SUCCESS; + } + + split = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); + + serf_bucket_headers_setx(ctx->headers, + ctx->linebuf.line, (split - ctx->linebuf.line), + TRUE /* copy */, + split + 2, + ctx->linebuf.used - (split - ctx->linebuf.line) - 2, + TRUE /* copy */); + + return APR_SUCCESS; +} + +static apr_status_t serf_incoming_rq_wait_for(serf_bucket_t *bucket, + incoming_rq_status_t wait_for) +{ + incoming_request_context_t *ctx = bucket->data; + apr_status_t status; + + if (ctx->state == STATE_TRAILERS && wait_for == STATE_BODY) { + /* We are done with the body, but not with the request. + Can't return EOF yet */ + wait_for = STATE_DONE; + } + + while (ctx->state < wait_for) { + switch (ctx->state) { + case STATE_INIT: + status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream, + SERF_NEWLINE_ANY); + if (status) + return status; + + status = serf_incoming_rq_parse_rqline(bucket); + if (status) + return status; + break; + case STATE_HEADERS: + case STATE_TRAILERS: + status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream, + SERF_NEWLINE_ANY); + if (status) + return status; + + status = serf_incoming_rq_parse_headerline(bucket); + if (status) + return status; + break; + case STATE_PREBODY: + /* TODO: Determine the body type.. Wrap bucket if necessary, + etc.*/ + + /* What kind of body do we expect */ + { + const char *te; + + ctx->body = ctx->stream; + te = serf_bucket_headers_get(ctx->headers, "Transfer-Encoding"); + + if (te && strcasecmp(te, "chunked") == 0) { + ctx->body = serf_bucket_dechunk_create(ctx->stream, + bucket->allocator); + ctx->expect_trailers = true; + } + else { + const char *cl; + + cl = serf_bucket_headers_get(ctx->headers, "Content-Length"); + + if (cl) { + apr_uint64_t length; + length = apr_strtoi64(cl, NULL, 10); + if (errno == ERANGE) { + return APR_FROM_OS_ERROR(ERANGE); + } + ctx->body = serf_bucket_response_body_create( + ctx->body, length, bucket->allocator); + } + } + ctx->state++; + } + break; + case STATE_DONE: + break; + default: + return APR_EGENERAL; /* Should never happen */ + } + } + + return (ctx->state == STATE_DONE) ? APR_EOF : APR_SUCCESS; +} + +static apr_status_t serf_incoming_rq_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, + apr_size_t *len) +{ + incoming_request_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_incoming_rq_wait_for(bucket, STATE_BODY); + if (status || !ctx->body) { + *len = 0; + return status ? status : APR_EOF; + } + + status = serf_bucket_read(ctx->body, requested, data, len); + if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) { + ctx->state = STATE_TRAILERS; + status = APR_SUCCESS; + } + return status; +} + +static apr_status_t serf_incoming_rq_readline(serf_bucket_t *bucket, int acceptable, + int *found, + const char **data, apr_size_t *len) +{ + incoming_request_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_incoming_rq_wait_for(bucket, STATE_BODY); + if (status || !ctx->body) { + *found = 0; + *len = 0; + return status ? status : APR_EOF; + } + + status = serf_bucket_readline(ctx->body, acceptable, found, data, len); + if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) { + ctx->state = STATE_TRAILERS; + status = APR_SUCCESS; + } + return status; +} + +static apr_status_t serf_incoming_rq_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + incoming_request_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_incoming_rq_wait_for(bucket, STATE_BODY); + if (status || !ctx->body) { + *vecs_used = 0; + return status ? status : APR_EOF; + } + + status = serf_bucket_read_iovec(ctx->body, requested, vecs_size, + vecs, vecs_used); + if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) { + ctx->state = STATE_TRAILERS; + status = APR_SUCCESS; + } + return status; +} + +static apr_status_t serf_incoming_rq_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + incoming_request_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_incoming_rq_wait_for(bucket, STATE_BODY); + if (status || !ctx->body) { + *len = 0; + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + else if (APR_STATUS_IS_EOF(status)) + return SERF_ERROR_TRUNCATED_STREAM; + + return status ? APR_SUCCESS : APR_EOF; + } + + status = serf_bucket_peek(ctx->body, data, len); + if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) { + ctx->state = STATE_TRAILERS; + status = APR_SUCCESS; + } + return status; +} + +static void serf_incoming_rq_destroy(serf_bucket_t *bucket) +{ + incoming_request_context_t *ctx = bucket->data; + + if (ctx->method) + serf_bucket_mem_free(bucket->allocator, (void*)ctx->method); + if (ctx->path_raw) + serf_bucket_mem_free(bucket->allocator, (void*)ctx->path_raw); + if (ctx->headers) + serf_bucket_destroy(ctx->headers); + if (ctx->body) + serf_bucket_destroy(ctx->body); + else if (ctx->stream) + serf_bucket_destroy(ctx->stream); + + serf_default_destroy_and_data(bucket); +} + +apr_status_t serf_bucket_incoming_request_read( + serf_bucket_t **headers, + const char **method, + const char **path, + int *http_version, + serf_bucket_t *bucket) +{ + incoming_request_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_incoming_rq_wait_for(bucket, STATE_BODY); + if (status) { + if (headers) + *headers = NULL; + if (method) + *method = NULL; + if (path) + *path = NULL; + if (http_version) + *http_version = 0; + + return status; + } + + if (headers) + *headers = ctx->headers; + if (method) + *method = ctx->method; + if (path) + *path = ctx->path_raw; + if (http_version) + *http_version = ctx->version; + + return APR_SUCCESS; +} + +apr_status_t serf_bucket_incoming_request_wait_for_headers( + serf_bucket_t *bucket) +{ + return serf_incoming_rq_wait_for(bucket, STATE_BODY); +} + + +const serf_bucket_type_t serf_bucket_type_incoming_request = { + "INCOMING-REQUEST", + serf_incoming_rq_read, + serf_incoming_rq_readline, + serf_incoming_rq_read_iovec, + serf_default_read_for_sendfile, + serf_buckets_are_v2, + serf_incoming_rq_peek, + serf_incoming_rq_destroy, + serf_default_read_bucket, + serf_default_get_remaining, + serf_default_ignore_config +}; Modified: serf/trunk/serf_bucket_types.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_bucket_types.h?rev=1714539&r1=1714538&r2=1714539&view=diff ============================================================================== --- serf/trunk/serf_bucket_types.h (original) +++ serf/trunk/serf_bucket_types.h Mon Nov 16 10:32:49 2015 @@ -81,6 +81,30 @@ void serf_bucket_request_set_root( serf_bucket_t *bucket, const char *root_url); + +/* ==================================================================== */ + +extern const serf_bucket_type_t serf_bucket_type_incoming_request; +#define SERF_BUCKET_IS_INCOMING_REQUEST(b) \ + SERF_BUCKET_CHECK((b), incoming_request) + +serf_bucket_t *serf_bucket_incoming_request_create( + serf_bucket_t *body, + serf_bucket_alloc_t *allocator); + +/* All output arguments optional. Waits for the request line to have arrived + with the normal read responses. */ +/* ### Add RESULT_POOL argument? */ +apr_status_t serf_bucket_incoming_request_read( + serf_bucket_t **headers, + const char **method, + const char **path, + int *http_version, + serf_bucket_t *request); + +apr_status_t serf_bucket_incoming_request_wait_for_headers( + serf_bucket_t *response); + /* ==================================================================== */