Thanks Graham for this great patch, comments inline...
On Sun, Oct 4, 2015 at 12:10 PM, <[email protected]> wrote:
> Author: minfrin
> Date: Sun Oct 4 10:10:51 2015
> New Revision: 1706669
>
> URL: http://svn.apache.org/viewvc?rev=1706669&view=rev
> Log:
> core: Extend support for asynchronous write completion from the
> network filter to any connection or request filter.
>
[]
>
> Modified: httpd/httpd/trunk/include/util_filter.h
> URL:
> http://svn.apache.org/viewvc/httpd/httpd/trunk/include/util_filter.h?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/include/util_filter.h (original)
> +++ httpd/httpd/trunk/include/util_filter.h Sun Oct 4 10:10:51 2015
> @@ -278,6 +278,13 @@ struct ap_filter_t {
> * to the request_rec, except that it is used for connection filters.
> */
> conn_rec *c;
> +
> + /** Buffered data associated with the current filter. */
> + apr_bucket_brigade *bb;
> +
> + /** Dedicated pool to use for deferred writes. */
> + apr_pool_t *deferred_pool;
Could we make these opaque, eg:
# util_filter.h:
struct ap_filter_data;
struct ap_filter_t {
...
struct ap_filter_data *data;
};
# util_filter.c:
struct ap_filter_data
{
/** Buffered data associated with the current filter. */
apr_bucket_brigade *bb;
/** Dedicated pool to use for deferred writes. */
apr_pool_t *deferred_pool;
};
This would prevent them from being mangled anywhere (they are
internals to util_filter.c anyway).
It could also possibly avoid walking the brigade for every
ap_filter_reinstate_brigade() call...
> +
> };
>
[]
>
> Modified: httpd/httpd/trunk/modules/http/http_request.c
> URL:
> http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http/http_request.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/http/http_request.c (original)
> +++ httpd/httpd/trunk/modules/http/http_request.c Sun Oct 4 10:10:51 2015
> @@ -256,6 +256,14 @@ AP_DECLARE(void) ap_process_request_afte
> apr_bucket *b;
> conn_rec *c = r->connection;
>
> + /* Find the last request, taking into account internal
> + * redirects. We want to send the EOR bucket at the end of
> + * all the buckets so it does not jump the queue.
> + */
> + while (r->next) {
> + r = r->next;
> + }
> +
> /* Send an EOR bucket through the output filter chain. When
> * this bucket is destroyed, the request will be logged and
> * its pool will be freed
> @@ -264,8 +272,8 @@ AP_DECLARE(void) ap_process_request_afte
> b = ap_bucket_eor_create(c->bucket_alloc, r);
> APR_BRIGADE_INSERT_HEAD(bb, b);
>
> - ap_pass_brigade(c->output_filters, bb);
> -
> + ap_pass_brigade(r->output_filters, bb);;;
Nit, trailing ";;"
> +
[]
> Modified: httpd/httpd/trunk/server/mpm/event/event.c
> URL:
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/event.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/mpm/event/event.c (original)
> +++ httpd/httpd/trunk/server/mpm/event/event.c Sun Oct 4 10:10:51 2015
> @@ -1146,19 +1146,38 @@ read_request:
> }
>
[]
> +
> + rindex = apr_hash_first(NULL, c->filters);
> + while (rindex) {
> + ap_filter_t *f = apr_hash_this_val(rindex);
> +
> + if (!APR_BRIGADE_EMPTY(f->bb)) {
> +
> + rv = ap_pass_brigade(f, c->empty);
> + apr_brigade_cleanup(c->empty);
> + if (APR_SUCCESS != rv) {
> + ap_log_cerror(
> + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
> + "write failure in '%s' output filter",
> f->frec->name);
> + break;
> + }
> +
> + if (ap_filter_should_yield(f)) {
> + data_in_output_filters = 1;
> + }
> + }
> +
> + rindex = apr_hash_next(rindex);
> }
This pattern looks shared by all (relevant) MPMs, couldn't we make it
a function? Maybe ap_reinstate_conn()?
Also it seems that we leak the hash iterator here (on c->pool).
Couldn't we use a (single) linked list for c->filters since
ap_filter_t already has a 'next' field?
[]
>
> Modified: httpd/httpd/trunk/server/mpm/motorz/motorz.c
> URL:
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/motorz/motorz.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/mpm/motorz/motorz.c (original)
> +++ httpd/httpd/trunk/server/mpm/motorz/motorz.c Sun Oct 4 10:10:51 2015
> @@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(mo
[]
> +
> + rindex = apr_hash_first(NULL, c->filters);
> + while (rindex) {
> + ap_filter_t *f = apr_hash_this_val(rindex);
> +
> + if (!APR_BRIGADE_EMPTY(f->bb)) {
>
> - rv = output_filter->frec->filter_func.out_func(output_filter,
> - NULL);
> + rv = ap_pass_brigade(f, c->empty);
> + apr_brigade_cleanup(c->empty);
> + if (APR_SUCCESS != rv) {
> + ap_log_cerror(
> + APLOG_MARK, APLOG_DEBUG, rv, c,
> APLOGNO(02848)
> + "write failure in '%s' output filter",
> f->frec->name);
> + break;
> + }
> +
> + if (ap_filter_should_yield(f)) {
> + data_in_output_filters = 1;
> + }
> + }
> +
> + rindex = apr_hash_next(rindex);
> + }
Here.
[]
> Modified: httpd/httpd/trunk/server/mpm/simple/simple_io.c
> URL:
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/simple/simple_io.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/mpm/simple/simple_io.c (original)
> +++ httpd/httpd/trunk/server/mpm/simple/simple_io.c Sun Oct 4 10:10:51 2015
> @@ -92,20 +92,37 @@ static apr_status_t simple_io_process(si
[]
> +
> + rindex = apr_hash_first(NULL, c->filters);
> + while (rindex) {
> + ap_filter_t *f = apr_hash_this_val(rindex);
> +
> + if (!APR_BRIGADE_EMPTY(f->bb)) {
>
> - rv = output_filter->frec->filter_func.out_func(output_filter,
> - NULL);
> + rv = ap_pass_brigade(f, c->empty);
> + apr_brigade_cleanup(c->empty);
> + if (APR_SUCCESS != rv) {
> + ap_log_cerror(
> + APLOG_MARK, APLOG_DEBUG, rv, c,
> APLOGNO(00249)
> + "write failure in '%s' output filter",
> f->frec->name);
> + break;
> + }
> +
> + if (ap_filter_should_yield(f)) {
> + data_in_output_filters = 1;
> + }
> + }
> +
> + rindex = apr_hash_next(rindex);
> + }
And here.
[]
>
> Modified: httpd/httpd/trunk/server/util_filter.c
> URL:
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/util_filter.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/util_filter.c (original)
> +++ httpd/httpd/trunk/server/util_filter.c Sun Oct 4 10:10:51 2015
[]
> @@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade
> apr_status_t rv, srv = APR_SUCCESS;
>
> /* If have never stored any data in the filter, then we had better
> - * create an empty bucket brigade so that we can concat.
> + * create an empty bucket brigade so that we can concat. Register
> + * a cleanup to zero out the pointer if the pool is cleared.
Maybe this comment belongs in ap_filter_setaside_brigade()?
> */
> if (!(*saveto)) {
> *saveto = apr_brigade_create(p, f->c->bucket_alloc);
> @@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade
> return srv;
> }
>
> +static apr_status_t filters_cleanup(void *data)
> +{
> + ap_filter_t **key = data;
> +
> + apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL);
> +
> + return APR_SUCCESS;
> +}
Shouldn't we call filters_cleanup() from ap_remove_output_filter() too?
> +
> +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
> + apr_bucket_brigade *bb)
> +{
> + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
> +
> + if (loglevel >= APLOG_TRACE6) {
> + ap_log_cerror(
> + APLOG_MARK, APLOG_TRACE6, 0, f->c,
> + "setaside %s brigade to %s brigade in '%s' output filter",
> + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
> + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"),
> f->frec->name);
> + }
> +
> + if (!APR_BRIGADE_EMPTY(bb)) {
> + apr_pool_t *pool;
> + /*
> + * Set aside the brigade bb within f->bb.
> + */
> + if (!f->bb) {
> + ap_filter_t **key;
> +
> + pool = f->r ? f->r->pool : f->c->pool;
> +
> + key = apr_palloc(pool, sizeof(ap_filter_t **));
> + *key = f;
> + apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f);
Why not simply use:
key = apr_pmemdup(pool, &f, sizeof f);
apr_hash_set(f->c->filters, &key, sizeof key, f)
here, and:
ap_filter_t *f = data;
apr_hash_set(f->c->filters, &f, sizeof f, NULL);
in filters_cleanup() above?
The linked list could possibly be simpler here too (using the 'next' field).
Above filters_cleanup() would have to start from the beginning of the
list each time to remove the entries, though.
> +
> + f->bb = apr_brigade_create(pool, f->c->bucket_alloc);
> +
> + apr_pool_pre_cleanup_register(pool, key, filters_cleanup);
> +
> + }
> +
> + /* decide what pool we setaside to, request pool or deferred pool? */
> + if (f->r) {
> + pool = f->r->pool;
> + APR_BRIGADE_CONCAT(f->bb, bb);
> + }
> + else {
> + if (!f->deferred_pool) {
> + apr_pool_create(&f->deferred_pool, f->c->pool);
> + apr_pool_tag(f->deferred_pool, "deferred_pool");
> + }
> + pool = f->deferred_pool;
> + return ap_save_brigade(f, &f->bb, &bb, pool);
> + }
Shouldn't ap_save_brigade() be moved below the "else"?
> +
> + }
> + else if (f->deferred_pool) {
> + /*
> + * There are no more requests in the pipeline. We can just clear the
> + * pool.
> + */
> + apr_brigade_cleanup(f->bb);
> + apr_pool_clear(f->deferred_pool);
> + }
> + return APR_SUCCESS;
> +}
> +
> +AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
> + apr_bucket_brigade *bb,
> + apr_bucket **flush_upto)
> +{
[]
> + apr_bucket *bucket, *next;
> + apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
> + int eor_buckets_in_brigade, morphing_bucket_in_brigade;
> + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
> +
> + if (loglevel >= APLOG_TRACE6) {
> + ap_log_cerror(
> + APLOG_MARK, APLOG_TRACE6, 0, f->c,
> + "reinstate %s brigade to %s brigade in '%s' output filter",
> + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"),
> + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name);
> + }
> +
> + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
> + APR_BRIGADE_PREPEND(bb, f->bb);
> + }
> +
> + /*
> + * Determine if and up to which bucket we need to do a blocking write:
> + *
> + * a) The brigade contains a flush bucket: Do a blocking write
> + * of everything up that point.
> + *
> + * b) The request is in CONN_STATE_HANDLER state, and the brigade
> + * contains at least THRESHOLD_MAX_BUFFER bytes in non-file
> + * buckets: Do blocking writes until the amount of data in the
> + * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
> + * rule is to provide flow control, in case a handler is
> + * streaming out lots of data faster than the data can be
> + * sent to the client.)
> + *
> + * c) The request is in CONN_STATE_HANDLER state, and the brigade
> + * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
> + * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
> + * buckets are left. (The point of this rule is to prevent too many
> + * FDs being kept open by pipelined requests, possibly allowing a
> + * DoS).
> + *
> + * d) The request is being served by a connection filter and the
> + * brigade contains a morphing bucket: If there was no other
> + * reason to do a blocking write yet, try reading the bucket. If its
> + * contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
> + * everything is fine. Otherwise we need to do a blocking write the
> + * up to and including the morphing bucket, because ap_save_brigade()
> + * would read the whole bucket into memory later on.
> + */
> +
> + *flush_upto = NULL;
> +
> + bytes_in_brigade = 0;
> + non_file_bytes_in_brigade = 0;
> + eor_buckets_in_brigade = 0;
> + morphing_bucket_in_brigade = 0;
Per the ealier suggestion to make ap_filter_data opaque, these could
be part of the struct (and reentrant).
We could then PREPEND after the loop below, and avoid potentially to
walk the same buckets each time.
> +
> + for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
> + bucket = next) {
> + next = APR_BUCKET_NEXT(bucket);
> +
> + if (!APR_BUCKET_IS_METADATA(bucket)) {
> + if (bucket->length == (apr_size_t)-1) {
> + /*
> + * A setaside of morphing buckets would read everything into
> + * memory. Instead, we will flush everything up to and
> + * including this bucket.
> + */
> + morphing_bucket_in_brigade = 1;
> + }
> + else {
> + bytes_in_brigade += bucket->length;
> + if (!APR_BUCKET_IS_FILE(bucket))
> + non_file_bytes_in_brigade += bucket->length;
> + }
> + }
> + else if (AP_BUCKET_IS_EOR(bucket)) {
> + eor_buckets_in_brigade++;
> + }
> +
> + if (APR_BUCKET_IS_FLUSH(bucket)
> + || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
> + || (!f->r && morphing_bucket_in_brigade)
> + || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
> + /* this segment of the brigade MUST be sent before returning. */
> +
> + if (loglevel >= APLOG_TRACE6) {
> + char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
> + "FLUSH bucket" :
> + (non_file_bytes_in_brigade >=
> THRESHOLD_MAX_BUFFER) ?
> + "THRESHOLD_MAX_BUFFER" :
> + (!f->r && morphing_bucket_in_brigade) ?
> "morphing bucket" :
> + "MAX_REQUESTS_IN_PIPELINE";
> + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
> + "will flush because of %s", reason);
> + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
> + "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
> + ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
> + "buckets: %d, morphing buckets: %d",
> + flush_upto == NULL ? " so far"
> + : " since last flush point",
> + bytes_in_brigade,
> + non_file_bytes_in_brigade,
> + eor_buckets_in_brigade,
> + morphing_bucket_in_brigade);
> + }
> + /*
> + * Defer the actual blocking write to avoid doing many writes.
> + */
> + *flush_upto = next;
> +
> + bytes_in_brigade = 0;
> + non_file_bytes_in_brigade = 0;
> + eor_buckets_in_brigade = 0;
> + morphing_bucket_in_brigade = 0;
> + }
> + }
Regards,
Yann.