On Sunday 22 January 2012, Stefan Fritsch wrote:
> I think that your patch is correct. However, as an optimization,
> one could try reading the morphing bucket until there are
> THRESHOLD_MAX_BUFFER bytes in memory. If all morphing buckets in
> the brigade disappear before reaching that limit, one could still
> do async write completion.
I have attached a modified version of your patch that includes this
optimization and adds some comments.
diff --git a/server/core_filters.c b/server/core_filters.c
index 73828af..e2ecb47 100644
--- a/server/core_filters.c
+++ b/server/core_filters.c
@@ -370,7 +370,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
apr_bucket_brigade *bb = NULL;
apr_bucket *bucket, *next, *flush_upto = NULL;
apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
- int eor_buckets_in_brigade;
+ int eor_buckets_in_brigade, morphing_bucket_in_brigade;
apr_status_t rv;
/* Fail quickly if the connection has already been aborted. */
@@ -416,7 +416,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
}
/* Scan through the brigade and decide whether to attempt a write,
- * based on the following rules:
+ * and how much to write, based on the following rules:
*
* 1) The new_bb is null: Do a nonblocking write of as much as
* possible: do a nonblocking write of as much data as possible,
@@ -425,10 +425,13 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
* completion and has just determined that this connection
* is writable.)
*
- * 2) The brigade contains a flush bucket: Do a blocking write
+ * 2) Determine if and up to which bucket we need to do a blocking
+ * write:
+ *
+ * a) The brigade contains a flush bucket: Do a blocking write
* of everything up that point.
*
- * 3) The request is in CONN_STATE_HANDLER state, and the brigade
+ * b) The request is in CONN_STATE_HANDLER state, and the brigade
* contains at least THRESHOLD_MAX_BUFFER bytes in non-file
* buckets: Do blocking writes until the amount of data in the
* buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
@@ -436,14 +439,25 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
* streaming out lots of data faster than the data can be
* sent to the client.)
*
- * 4) The request is in CONN_STATE_HANDLER state, and the brigade
+ * c) The request is in CONN_STATE_HANDLER state, and the brigade
* contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
* Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
* buckets are left. (The point of this rule is to prevent too many
* FDs being kept open by pipelined requests, possibly allowing a
* DoS).
*
- * 5) The brigade contains at least THRESHOLD_MIN_WRITE
+ * d) The brigade contains a morphing bucket: If there was no other
+ * reason to do a blocking write yet, try reading the bucket. If its
+ * contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+ * everything is fine. Otherwise we need to do a blocking write the
+ * up to and including the morphing bucket, because ap_save_brigade()
+ * would read the whole bucket into memory later on.
+ *
+ * 3) Actually do the blocking write up to the last bucket determined
+ * by rules 2a-d. The point of doing only one flush is to make as
+ * few calls to writev() as possible.
+ *
+ * 4) If the brigade contains at least THRESHOLD_MIN_WRITE
* bytes: Do a nonblocking write of as much data as possible,
* then save the rest in ctx->buffered_bb.
*/
@@ -465,40 +479,58 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
bytes_in_brigade = 0;
non_file_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
+
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
bucket = next) {
next = APR_BUCKET_NEXT(bucket);
if (!APR_BUCKET_IS_METADATA(bucket)) {
- if (bucket->length == (apr_size_t)-1) {
- const char *data;
- apr_size_t length;
- /* XXX support nonblocking read here? */
- rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- /* reading may have split the bucket, so recompute next: */
- next = APR_BUCKET_NEXT(bucket);
+ if (bucket->length == (apr_size_t)-1 && flush_upto != NULL) {
+ /*
+ * A setaside of morphing buckets would read everything into
+ * memory. Instead, we will flush everything up to and
+ * including this bucket.
+ */
+ morphing_bucket_in_brigade = 1;
}
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket)) {
- non_file_bytes_in_brigade += bucket->length;
+ else {
+ if (bucket->length == (apr_size_t)-1) {
+ const char *data;
+ apr_size_t length;
+ /*
+ * We have a morphing buckets but not yet enough data to
+ * force a flush. Try reading a bot from the bucket and
+ * hope that we can read it into memory before we hit
+ * THRESHOLD_MAX_BUFFER.
+ */
+ rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS)
+ return rv;
+ /* reading may have split the bucket, so recompute next: */
+ next = APR_BUCKET_NEXT(bucket);
+ }
+ bytes_in_brigade += bucket->length;
+ if (!APR_BUCKET_IS_FILE(bucket))
+ non_file_bytes_in_brigade += bucket->length;
}
}
else if (AP_BUCKET_IS_EOR(bucket)) {
eor_buckets_in_brigade++;
}
- if (APR_BUCKET_IS_FLUSH(bucket) ||
- (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ||
- (eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) )
- {
+ if (APR_BUCKET_IS_FLUSH(bucket)
+ || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+ || morphing_bucket_in_brigade
+ || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+ /* this segment of the brigade MUST be sent before returning. */
+
if (APLOGctrace6(c)) {
char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
"FLUSH bucket" :
(non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
"THRESHOLD_MAX_BUFFER" :
+ morphing_bucket_in_brigade ? "morphing bucket" :
"MAX_REQUESTS_IN_PIPELINE";
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
"core_output_filter: flushing because of %s",
@@ -512,6 +544,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
bytes_in_brigade = 0;
non_file_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
}
}