On Thu, Mar 26, 2020 at 12:18 PM Ruediger Pluem <[email protected]> wrote:
>
> On 3/26/20 12:11 AM, Graham Leggett wrote:
> >
> > What it does do is work around buggy request filters (set it to connection)
> > or buggy connection filters (set it to network and get 2.4 behaviour)..
What it does is read/morph buckets before the connection filters, is
because we can't pass some "buggy request_filter" morphing bucket to
the core/ssl output filters? Or more exactly to further
ap_filter_setaside_brigade()?
If some filter (beside core ones) need to apr_bucket_read() morphing
buckets it possibly should use ap_filter_reinstate_brigade_ex() with a
new AP_FILTER_REINSTATE_MORPH option (probably accompanied by
AP_FILTER_REINSTATE_NONBLOCKING for the usual/optional first
nonblocking then flush then blocking danse).
> >
> > Set AsyncFilter appropriately and you’ll at least narrow it down.
> >
> > The question you’re asking is “why is is an async path being taken when
> > AP_MPMQ_IS_ASYNC is false”. The setasides and reinstates should be noops in
> > this situation.
>
> Having the setasides and reinstates as noops in this situation might help,
> but as far as I can tell they make no difference
> whether the MPM is sync or async.
> What makes you think that an async path is taken?
Not until WRITE_COMPLETION.
> Setting AsyncFilter to connection will very likely help (not tested yet) as
> it effectively removes the ap_request_core_filter.
I think I'm convinced we don't need the core request filter.
It does nothing different than next ssl_io_filter_output() or
ap_core_output_filter() it seems, so is it just a matter of morphing
buckets lifetime?
Thanks to the EOR bucket mechanism, we assume that objects owned by a
request can be used safely provided they precede the EOR, assumption
which IMHO we can extend to morphing buckets. If so, I think we can
avoid the apr_bucket_setaside() = ENOTIMPL issue by simply moving
morphing buckets from/to user and filter brigades in
ap_filter_setaside/reinstate_brigade().
Just like FILE buckets, morphing buckets are just file descriptors
that use no memory until read, so if we move them on
setaside/reinstate we can just ignore them for flushing heuristic and
flush_max_threshold accounting.
So ap_filter_reinstate_brigade() would *flush_upto in-memory limits
only (and FLUSH of course), anything else can wait.
The blocking limit is flush_max_pipelined then, but we could split
flush_max_threshold into more fine grained _max_in_memory_threshold
and _max_total_threshold if needed.
Wouldn't that work (patch attached, passes test framework and Joe's repro)?
Regards,
Yann.
Index: server/core.c
===================================================================
--- server/core.c (revision 1875498)
+++ server/core.c (working copy)
@@ -122,7 +122,6 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_n
/* Handles for core filters */
AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle;
-AP_DECLARE_DATA ap_filter_rec_t *ap_request_core_filter_handle;
AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle;
AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle;
AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle;
@@ -5916,9 +5915,6 @@ static void register_hooks(apr_pool_t *p)
ap_core_output_filter_handle =
ap_register_output_filter("CORE", ap_core_output_filter,
NULL, AP_FTYPE_NETWORK);
- ap_request_core_filter_handle =
- ap_register_output_filter("REQ_CORE", ap_request_core_filter,
- NULL, AP_FTYPE_CONNECTION - 1);
ap_subreq_core_filter_handle =
ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter,
NULL, AP_FTYPE_CONTENT_SET);
Index: modules/http/http_core.c
===================================================================
--- modules/http/http_core.c (revision 1875498)
+++ modules/http/http_core.c (working copy)
@@ -268,8 +268,6 @@ static int http_create_request(request_rec *r)
NULL, r, r->connection);
ap_add_output_filter_handle(ap_http_outerror_filter_handle,
NULL, r, r->connection);
- ap_add_output_filter_handle(ap_request_core_filter_handle,
- NULL, r, r->connection);
}
return OK;
Index: server/request.c
===================================================================
--- server/request.c (revision 1875498)
+++ server/request.c (working copy)
@@ -2058,124 +2058,6 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_ou
return APR_SUCCESS;
}
-AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
- apr_bucket_brigade *bb)
-{
- apr_status_t status = APR_SUCCESS;
- apr_read_type_e block = APR_NONBLOCK_READ;
- conn_rec *c = f->r->connection;
- apr_bucket *flush_upto = NULL;
- apr_bucket_brigade *tmp_bb;
- apr_size_t tmp_bb_len = 0;
- core_server_config *conf;
- int seen_eor = 0;
-
- /*
- * Handle the AsyncFilter directive. We limit the filters that are
- * eligible for asynchronous handling here.
- */
- if (f->frec->ftype < f->c->async_filter) {
- ap_remove_output_filter(f);
- return ap_pass_brigade(f->next, bb);
- }
-
- conf = ap_get_core_module_config(f->r->server->module_config);
-
- /* Reinstate any buffered content */
- ap_filter_reinstate_brigade(f, bb, &flush_upto);
-
- /* After EOR is passed downstream, anything pooled on the request may
- * be destroyed (including bb!), but not tmp_bb which is acquired from
- * c->pool (and released after the below loop).
- */
- tmp_bb = ap_acquire_brigade(f->c);
-
- /* Don't touch *bb after seen_eor */
- while (status == APR_SUCCESS && !seen_eor && !APR_BRIGADE_EMPTY(bb)) {
- apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
- int do_pass = 0;
-
- if (AP_BUCKET_IS_EOR(bucket)) {
- /* pass out everything and never come back again,
- * r is destroyed with this bucket!
- */
- APR_BRIGADE_CONCAT(tmp_bb, bb);
- ap_remove_output_filter(f);
- seen_eor = 1;
- }
- else {
- /* if the core has set aside data, back off and try later */
- if (!flush_upto) {
- if (ap_filter_should_yield(f->next)) {
- break;
- }
- }
- else if (flush_upto == bucket) {
- flush_upto = NULL;
- }
-
- /* have we found a morphing bucket? if so, force it to morph into
- * something safe to pass down to the connection filters without
- * needing to be set aside.
- */
- if (bucket->length == (apr_size_t)-1) {
- const char *data;
- apr_size_t size;
-
- status = apr_bucket_read(bucket, &data, &size, block);
- if (status != APR_SUCCESS) {
- if (!APR_STATUS_IS_EAGAIN(status)
- || block != APR_NONBLOCK_READ) {
- break;
- }
- /* Flush everything so far and retry in blocking mode */
- bucket = apr_bucket_flush_create(c->bucket_alloc);
- block = APR_BLOCK_READ;
- }
- else {
- tmp_bb_len += size;
- block = APR_NONBLOCK_READ;
- }
- }
- else {
- tmp_bb_len += bucket->length;
- }
-
- /* move the bucket to tmp_bb and check whether it exhausts bb or
- * brings tmp_bb above the limit; in both cases it's time to pass
- * everything down the chain.
- */
- APR_BUCKET_REMOVE(bucket);
- APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket);
- if (APR_BRIGADE_EMPTY(bb)
- || APR_BUCKET_IS_FLUSH(bucket)
- || tmp_bb_len >= conf->flush_max_threshold) {
- do_pass = 1;
- }
- }
-
- if (do_pass || seen_eor) {
- status = ap_pass_brigade(f->next, tmp_bb);
- apr_brigade_cleanup(tmp_bb);
- tmp_bb_len = 0;
- }
- }
-
- /* Don't touch *bb after seen_eor */
- if (!seen_eor) {
- apr_status_t rv;
- APR_BRIGADE_PREPEND(bb, tmp_bb);
- rv = ap_filter_setaside_brigade(f, bb);
- if (status == APR_SUCCESS) {
- status = rv;
- }
- }
-
- ap_release_brigade(f->c, tmp_bb);
-
- return status;
-}
-
extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required;
AP_DECLARE(int) ap_some_auth_required(request_rec *r)
Index: modules/http/http_request.c
===================================================================
--- modules/http/http_request.c (revision 1875498)
+++ modules/http/http_request.c (working copy)
@@ -350,7 +350,6 @@ AP_DECLARE(void) ap_process_request_after_handler(
apr_bucket_brigade *bb;
apr_bucket *b;
conn_rec *c = r->connection;
- ap_filter_t *f;
bb = ap_acquire_brigade(c);
@@ -371,15 +370,9 @@ AP_DECLARE(void) ap_process_request_after_handler(
/* All the request filters should have bailed out on EOS, and in any
* case they shouldn't have to handle this EOR which will destroy the
- * request underneath them. So go straight to the core request filter
- * which (if any) will take care of the setaside buckets.
+ * request underneath them. So go straight to the connection filters.
*/
- for (f = r->output_filters; f; f = f->next) {
- if (f->frec == ap_request_core_filter_handle) {
- break;
- }
- }
- ap_pass_brigade(f ? f : c->output_filters, bb);
+ ap_pass_brigade(c->output_filters, bb);
/* The EOR bucket has either been handled by an output filter (eg.
* deleted or moved to a buffered_bb => no more in bb), or an error
Index: include/http_request.h
===================================================================
--- include/http_request.h (revision 1875498)
+++ include/http_request.h (working copy)
@@ -149,18 +149,6 @@ AP_DECLARE(int) ap_run_sub_req(request_rec *r);
*/
AP_DECLARE(void) ap_destroy_sub_req(request_rec *r);
-/**
- * An output filter to ensure that we avoid passing morphing buckets to
- * connection filters and in so doing defeat async write completion when
- * they are set aside. This should be inserted at the end of a request
- * filter stack.
- * @param f The current filter
- * @param bb The brigade to filter
- * @return status code
- */
-AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
- apr_bucket_brigade *bb);
-
/*
* Then there's the case that you want some other request to be served
* as the top-level request INSTEAD of what the client requested directly.
@@ -601,6 +589,15 @@ AP_DECLARE(int) ap_if_walk(request_rec *r);
AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_eor;
/**
+ * Determine if a bucket is morphing, that is which changes its
+ * type on read (usually to "heap" allocated data), while moving
+ * itself at the next position to remain plugged until exhausted.
+ * @param e The bucket to inspect
+ * @return true or false
+ */
+#define AP_BUCKET_IS_MORPHING(e) ((e)->length == (apr_size_t)-1)
+
+/**
* Determine if a bucket is an End Of REQUEST (EOR) bucket
* @param e The bucket to inspect
* @return true or false
Index: server/util_filter.c
===================================================================
--- server/util_filter.c (revision 1875498)
+++ server/util_filter.c (working copy)
@@ -934,9 +934,20 @@ AP_DECLARE(int) ap_filter_prepare_brigade(ap_filte
return OK;
}
+static apr_status_t save_aside_brigade(struct ap_filter_private *fp,
+ apr_bucket_brigade *bb)
+{
+ if (!fp->deferred_pool) {
+ apr_pool_create(&fp->deferred_pool, fp->f->c->pool);
+ apr_pool_tag(fp->deferred_pool, "deferred_pool");
+ }
+ return ap_save_brigade(fp->f, &fp->bb, &bb, fp->deferred_pool);
+}
+
AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
apr_bucket_brigade *bb)
{
+ apr_status_t rv = APR_SUCCESS;
struct ap_filter_private *fp = f->priv;
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
@@ -945,34 +956,56 @@ AP_DECLARE(apr_status_t) ap_filter_setaside_brigad
(!fp->bb || APR_BRIGADE_EMPTY(fp->bb)) ? "empty" : "full",
f->frec->name);
+ /* Buckets in fp->bb are leftover from previous call to reinstate, so
+ * they happen after the buckets (re)set aside here.
+ */
+ if (fp->bb) {
+ APR_BRIGADE_CONCAT(bb, fp->bb);
+ }
+
if (!APR_BRIGADE_EMPTY(bb)) {
+ apr_bucket_brigade *tmp_bb = NULL;
+
/*
- * Set aside the brigade bb within fp->bb.
+ * Set aside the brigade bb to fp->bb.
*/
+
ap_filter_prepare_brigade(f);
+ do {
+ apr_bucket *e = APR_BRIGADE_FIRST(bb);
+ APR_BUCKET_REMOVE(e);
- /* decide what pool we setaside to, request pool or deferred pool? */
- if (f->r) {
- apr_bucket *e;
- for (e = APR_BRIGADE_FIRST(bb); e != APR_BRIGADE_SENTINEL(bb); e =
- APR_BUCKET_NEXT(e)) {
- if (APR_BUCKET_IS_TRANSIENT(e)) {
- int rv = apr_bucket_setaside(e, f->r->pool);
+ /* Assume that morphing buckets have the correct lifetime! */
+ if (AP_BUCKET_IS_MORPHING(e)) {
+ if (tmp_bb && !APR_BRIGADE_EMPTY(tmp_bb)) {
+ /* Save non-morphing buckets batched below. */
+ rv = save_aside_brigade(fp, tmp_bb);
if (rv != APR_SUCCESS) {
- return rv;
+ APR_BRIGADE_PREPEND(bb, tmp_bb);
+ break;
}
}
+ APR_BRIGADE_INSERT_TAIL(fp->bb, e);
}
- APR_BRIGADE_CONCAT(fp->bb, bb);
- }
- else {
- if (!fp->deferred_pool) {
- apr_pool_create(&fp->deferred_pool, f->c->pool);
- apr_pool_tag(fp->deferred_pool, "deferred_pool");
+ else {
+ /* Save calls to ap_save_brigade() by batching successive
+ * non-morping buckets into a temporary brigade.
+ */
+ if (!tmp_bb) {
+ tmp_bb = ap_acquire_brigade(f->c);
+ }
+ APR_BRIGADE_INSERT_TAIL(tmp_bb, e);
}
- return ap_save_brigade(f, &fp->bb, &bb, fp->deferred_pool);
+ } while (!APR_BRIGADE_EMPTY(bb));
+
+ if (tmp_bb) {
+ /* Save any remainder. */
+ if (!APR_BRIGADE_EMPTY(tmp_bb)) {
+ rv = save_aside_brigade(fp, tmp_bb);
+ APR_BRIGADE_PREPEND(bb, tmp_bb);
+ }
+ ap_release_brigade(f->c, tmp_bb);
}
-
}
else if (fp->deferred_pool) {
/*
@@ -983,7 +1016,8 @@ AP_DECLARE(apr_status_t) ap_filter_setaside_brigad
apr_brigade_cleanup(fp->bb);
apr_pool_clear(fp->deferred_pool);
}
- return APR_SUCCESS;
+
+ return rv;
}
void ap_filter_adopt_brigade(ap_filter_t *f, apr_bucket_brigade *bb)
@@ -1007,8 +1041,8 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
apr_bucket **flush_upto)
{
apr_bucket *bucket, *next;
- apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
- int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+ apr_size_t bytes_in_brigade, memory_bytes_in_brigade;
+ int eor_buckets_in_brigade, morphing_buckets_in_brigade;
struct ap_filter_private *fp = f->priv;
core_server_config *conf;
@@ -1018,6 +1052,9 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
(APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
f->frec->name);
+ /* Buckets in fp->bb are leftover from previous call to setaside, so
+ * they happen before the buckets reinstated (and added) here.
+ */
if (fp->bb) {
APR_BRIGADE_PREPEND(bb, fp->bb);
}
@@ -1029,68 +1066,58 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
*flush_upto = NULL;
/*
- * Determine if and up to which bucket we need to do a blocking write:
+ * Determine if and up to which bucket the caller needs to do a blocking
+ * write:
*
- * a) The brigade contains a flush bucket: Do a blocking write
- * of everything up that point.
+ * a) The brigade contains at least one flush bucket: do blocking writes
+ * of everything up to the last one.
*
- * b) The request is in CONN_STATE_HANDLER state, and the brigade
- * contains at least flush_max_threshold bytes in non-file
- * buckets: Do blocking writes until the amount of data in the
- * buffer is less than flush_max_threshold. (The point of this
- * rule is to provide flow control, in case a handler is
- * streaming out lots of data faster than the data can be
+ * b) The brigade contains at least flush_max_threshold bytes in memory,
+ * that is non-file and non-morphing buckets: do blocking writes of
+ * everything up the last bucket above flush_max_threshold.
+ * (The point of this rule is to provide flow control, in case a
+ * handler is streaming out lots of data faster than the data can be
* sent to the client.)
*
- * c) The request is in CONN_STATE_HANDLER state, and the brigade
- * contains at least flush_max_pipelined EOR buckets:
- * Do blocking writes until less than flush_max_pipelined EOR
- * buckets are left. (The point of this rule is to prevent too many
- * FDs being kept open by pipelined requests, possibly allowing a
- * DoS).
+ * c) The brigade contains at least flush_max_pipelined EOR buckets: do
+ * blocking writes until the last EOR above flush_max_pipelined.
+ * (The point of this rule is to prevent too many FDs being kept open
+ * by pipelined requests, possibly allowing a DoS).
*
- * d) The request is being served by a connection filter and the
- * brigade contains a morphing bucket: If there was no other
- * reason to do a blocking write yet, try reading the bucket. If its
- * contents fit into memory before flush_max_threshold is reached,
- * everything is fine. Otherwise we need to do a blocking write the
- * up to and including the morphing bucket, because ap_save_brigade()
- * would read the whole bucket into memory later on.
+ * Note that morphing buckets use no memory until read, so they don't
+ * account for point b) above. Both ap_filter_reinstate_brigade() and
+ * ap_filter_setaside_brigade() assume that morphing buckets have an
+ * appropriate lifetime (until next EOR for instance), so they are simply
+ * setaside or reinstated by moving them from/to fp->bb to/from bb.
*/
bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
+ memory_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
+ morphing_buckets_in_brigade = 0;
- conf = ap_get_core_module_config(f->c->base_server->module_config);
+ conf = f->r ? ap_get_core_module_config(f->r->server->module_config)
+ : ap_get_core_module_config(f->c->base_server->module_config);
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
bucket = next) {
next = APR_BUCKET_NEXT(bucket);
- if (!APR_BUCKET_IS_METADATA(bucket)) {
- if (bucket->length == (apr_size_t)-1) {
- /*
- * A setaside of morphing buckets would read everything into
- * memory. Instead, we will flush everything up to and
- * including this bucket.
- */
- morphing_bucket_in_brigade = 1;
+ if (AP_BUCKET_IS_EOR(bucket)) {
+ eor_buckets_in_brigade++;
+ }
+ else if (AP_BUCKET_IS_MORPHING(bucket)) {
+ morphing_buckets_in_brigade++;
+ }
+ else if (bucket->length) {
+ bytes_in_brigade += bucket->length;
+ if (!APR_BUCKET_IS_FILE(bucket)) {
+ memory_bytes_in_brigade += bucket->length;
}
- else {
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket))
- non_file_bytes_in_brigade += bucket->length;
- }
}
- else if (AP_BUCKET_IS_EOR(bucket)) {
- eor_buckets_in_brigade++;
- }
if (APR_BUCKET_IS_FLUSH(bucket)
- || non_file_bytes_in_brigade >= conf->flush_max_threshold
- || (!f->r && morphing_bucket_in_brigade)
+ || memory_bytes_in_brigade > conf->flush_max_threshold
|| eor_buckets_in_brigade > conf->flush_max_pipelined) {
/* this segment of the brigade MUST be sent before returning. */
@@ -1097,22 +1124,20 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
if (APLOGctrace6(f->c)) {
char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
"FLUSH bucket" :
- (non_file_bytes_in_brigade >= conf->flush_max_threshold) ?
- "max threshold" :
- (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" :
- "max requests in pipeline";
+ (memory_bytes_in_brigade >= conf->flush_max_threshold) ?
+ "max threshold" : "max requests in pipeline";
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
"will flush because of %s", reason);
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
"seen in brigade%s: bytes: %" APR_SIZE_T_FMT
- ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
+ ", memory bytes: %" APR_SIZE_T_FMT ", eor "
"buckets: %d, morphing buckets: %d",
*flush_upto == NULL ? " so far"
: " since last flush point",
bytes_in_brigade,
- non_file_bytes_in_brigade,
+ memory_bytes_in_brigade,
eor_buckets_in_brigade,
- morphing_bucket_in_brigade);
+ morphing_buckets_in_brigade);
}
/*
* Defer the actual blocking write to avoid doing many writes.
@@ -1120,18 +1145,19 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
*flush_upto = next;
bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
+ memory_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
+ morphing_buckets_in_brigade = 0;
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
- "brigade contains: bytes: %" APR_SIZE_T_FMT
+ "brigade contains%s: bytes: %" APR_SIZE_T_FMT
", non-file bytes: %" APR_SIZE_T_FMT
", eor buckets: %d, morphing buckets: %d",
- bytes_in_brigade, non_file_bytes_in_brigade,
- eor_buckets_in_brigade, morphing_bucket_in_brigade);
+ *flush_upto == NULL ? "" : " since last flush point",
+ bytes_in_brigade, memory_bytes_in_brigade,
+ eor_buckets_in_brigade, morphing_buckets_in_brigade);
return APR_SUCCESS;
}
@@ -1200,8 +1226,8 @@ AP_DECLARE_NONSTD(int) ap_filter_output_pending(co
}
/* Flush outer most filters first for ap_filter_should_yield(f->next)
- * to be relevant in the previous ones (e.g. ap_request_core_filter()
- * won't pass its buckets if its next filters yield already).
+ * to be relevant in the previous ones (async filters won't pass their
+ * buckets if their next filters yield already).
*/
bb = ap_acquire_brigade(c);
for (fp = APR_RING_LAST(x->pending_output_filters);