On Thu, Jul 1, 2021 at 12:38 PM Yann Ylavic <ylavic....@gmail.com> wrote: > > On Wed, Jun 30, 2021 at 9:42 AM Joe Orton <jor...@redhat.com> wrote: > > > > If so I wonder if it wouldn't be better to overload FLUSH for this, e.g. > > by having a FLUSH bucket with a non-NULL ->data pointer or something? > > The core knows it is special but everywhere else treats as FLUSH. > > That's a great idea, let me try that.
Here is a new patch, the semantics of WC buckets are defined as: /** * @brief Write Completion (WC) bucket * * A WC bucket is a FLUSH bucket with special ->data == &ap_bucket_wc_data, * still both AP_BUCKET_IS_WC() and APR_BUCKET_IS_FLUSH() hold for them so * they have the same semantics for most filters, namely: * Everything produced before shall be passed to the next filter, including * the WC/FLUSH bucket itself. * The distinction between WC and FLUSH buckets is only for filters that care * about write completion (calling ap_filter_reinstate_brigade() with non-NULL * flush_upto), those can setaside WC buckets and the preceding data provided * they have first determined that the next filter(s) have pending data * already, usually by calling ap_filter_should_yield(f->next). */ The only filters that care about write completion for now are ap_core_output_filter() and ssl_io_filter_output(), which try to fill in the pipe as much as possible, using ap_filter_reinstate_brigade(&flush_upto) to determine whether they should flush (blocking) or setaside their remaining data. So ap_filter_reinstate_brigade() is made to not treat WC as FLUSH buckets and keep the above filters working as before (and correctly w.r.t WC bucket semantics). I first thought adding a new ap_filter_rec->proto_flags like AP_FILTER_PROTO_WC_READY to do this only for filter registered with this flag, and then register ap_core_output_filter() and ssl_io_filter_output() accordingly, but since they are the only users of ap_filter_reinstate_brigade() with flush_upto != NULL I kept it simple for now.. WDYT? Cheers; Yann.
Index: include/ap_mmn.h =================================================================== --- include/ap_mmn.h (revision 1892424) +++ include/ap_mmn.h (working copy) @@ -673,7 +673,7 @@ * ap_proxy_tunnel_conn_get_transferred() change * ap_proxy_transfer_between_connections() sent to apr_off_t *. * 20210531.0 (2.5.1-dev) add conn_rec->outgoing and ap_ssl_bind_outgoing() - * 20210531.1 (2.5.1-dev) Add ap_bucket_type_wc, ap_bucket_wc_make() and + * 20210531.1 (2.5.1-dev) Add ap_bucket_wc_data, ap_bucket_wc_make() and * ap_bucket_wc_create() to util_filter.h * 20210531.2 (2.5.1-dev) Add ap_proxy_get_worker_ex() and * ap_proxy_define_worker_ex() to mod_proxy.h Index: include/util_filter.h =================================================================== --- include/util_filter.h (revision 1892424) +++ include/util_filter.h (working copy) @@ -763,15 +763,31 @@ AP_DECLARE(void) ap_filter_protocol(ap_filter_t* f /** Filter is incompatible with "Cache-Control: no-transform" */ #define AP_FILTER_PROTO_TRANSFORM 0x20 -/** Write Completion (WC) bucket */ -AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_wc; +/** + * @brief Write Completion (WC) bucket + * + * A WC bucket is a FLUSH bucket with special ->data == &ap_bucket_wc_data, + * still both AP_BUCKET_IS_WC() and APR_BUCKET_IS_FLUSH() hold for them so + * they have the same semantics for most filters, namely: + * Everything produced before shall be passed to the next filter, including + * the WC/FLUSH bucket itself. + * The distinction between WC and FLUSH buckets is only for filters that care + * about write completion (calling ap_filter_reinstate_brigade() with non-NULL + * flush_upto), those can setaside WC buckets and the preceding data provided + * they have first determined that the next filter(s) have pending data + * already, usually by calling ap_filter_should_yield(f->next). + */ +/** Write Completion (WC) bucket data mark */ +AP_DECLARE_DATA extern const char ap_bucket_wc_data; + /** * Determine if a bucket is a Write Completion (WC) bucket * @param e The bucket to inspect * @return true or false */ -#define AP_BUCKET_IS_WC(e) ((e)->type == &ap_bucket_type_wc) +#define AP_BUCKET_IS_WC(e) (APR_BUCKET_IS_FLUSH(e) && \ + (e)->data == (void *)&ap_bucket_wc_data) /** * Make the bucket passed in a Write Completion (WC) bucket Index: server/util_filter.c =================================================================== --- server/util_filter.c (revision 1892424) +++ server/util_filter.c (working copy) @@ -976,7 +976,9 @@ AP_DECLARE(apr_status_t) ap_filter_setaside_brigad e = next) { next = APR_BUCKET_NEXT(e); - /* Strip WC buckets added by ap_filter_output_pending(). */ + /* WC buckets will be added back by ap_filter_output_pending() + * at the tail. + */ if (AP_BUCKET_IS_WC(e)) { apr_bucket_delete(e); continue; @@ -1069,6 +1071,7 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga int eor_buckets_in_brigade, opaque_buckets_in_brigade; struct ap_filter_private *fp = f->priv; core_server_config *conf; + int is_flush; ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, "reinstate %s brigade to %s brigade in '%s' %sput filter", @@ -1132,7 +1135,15 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga bucket = next) { next = APR_BUCKET_NEXT(bucket); - if (AP_BUCKET_IS_EOR(bucket)) { + /* When called with flush_upto != NULL, we assume that the caller does + * the right thing to potentially setaside WC buckets (per semantics), + * so we don't treat them as FLUSH(_upto) here. + */ + is_flush = (APR_BUCKET_IS_FLUSH(bucket) && !AP_BUCKET_IS_WC(bucket)); + if (is_flush) { + /* handled below */ + } + else if (AP_BUCKET_IS_EOR(bucket)) { eor_buckets_in_brigade++; } else if (bucket->length == (apr_size_t)-1) { @@ -1145,7 +1156,7 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga } } - if (APR_BUCKET_IS_FLUSH(bucket) + if (is_flush || (memory_bytes_in_brigade > conf->flush_max_threshold) || (conf->flush_max_pipelined >= 0 && eor_buckets_in_brigade > conf->flush_max_pipelined)) { @@ -1152,7 +1163,7 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga /* this segment of the brigade MUST be sent before returning. */ if (APLOGctrace6(f->c)) { - char *reason = APR_BUCKET_IS_FLUSH(bucket) ? + char *reason = is_flush ? "FLUSH bucket" : (memory_bytes_in_brigade > conf->flush_max_threshold) ? "max threshold" : "max requests in pipeline"; @@ -1400,22 +1411,15 @@ AP_DECLARE(void) ap_filter_protocol(ap_filter_t *f f->frec->proto_flags = flags ; } +/* Write Completion (WC) bucket implementation */ -static apr_status_t wc_bucket_read(apr_bucket *b, const char **str, - apr_size_t *len, apr_read_type_e block) -{ - *str = NULL; - *len = 0; - return APR_SUCCESS; -} +AP_DECLARE_DATA const char ap_bucket_wc_data; AP_DECLARE(apr_bucket *) ap_bucket_wc_make(apr_bucket *b) { - b->length = 0; - b->start = 0; - b->data = NULL; - b->type = &ap_bucket_type_wc; - + /* FLUSH bucket with special ->data mark (instead of NULL) */ + b = apr_bucket_flush_make(b); + b->data = (void *)&ap_bucket_wc_data; return b; } @@ -1428,12 +1432,3 @@ AP_DECLARE(apr_bucket *) ap_bucket_wc_create(apr_b b->list = list; return ap_bucket_wc_make(b); } - -AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_wc = { - "WC", 5, APR_BUCKET_METADATA, - apr_bucket_destroy_noop, - wc_bucket_read, - apr_bucket_setaside_noop, - apr_bucket_split_notimpl, - apr_bucket_simple_copy -}; Index: modules/proxy/proxy_util.c =================================================================== --- modules/proxy/proxy_util.c (revision 1892424) +++ modules/proxy/proxy_util.c (working copy) @@ -4551,6 +4551,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_betw APR_BRIGADE_INSERT_TAIL(bb_o, b); } else { + /* Prevent setaside/coalescing by intermediate filters. */ b = ap_bucket_wc_create(bb_o->bucket_alloc); APR_BRIGADE_INSERT_TAIL(bb_o, b); }