On 1/16/21 3:08 PM, minf...@apache.org wrote:
> Author: minfrin
> Date: Sat Jan 16 14:08:29 2021
> New Revision: 1885573
> 
> URL: http://svn.apache.org/viewvc?rev=1885573&view=rev
> Log:
> Backport to 2.4:
> 
>   *) core: output filtering improvements (ease following patches, align 
> trunk/2.4)
>      trunk patch: https://svn.apache.org/r1836032
>                   https://svn.apache.org/r1884295
>                   https://svn.apache.org/r1884296
>                   https://svn.apache.org/r1884304
>                   https://svn.apache.org/r1836237
>                   https://svn.apache.org/r1836258
>                   https://svn.apache.org/r1836354
>                   https://svn.apache.org/r1843939
>      2.4.x patch: 
> http://people.apache.org/~ylavic/patches/2.4.x-core_output_filtering-2on5.patch
>                   https://github.com/apache/httpd/pull/156
>      +1: ylavic, covener, minfrin
>      ylavic: These core output filter changes are needed for the proxy
>              tunneling loop to work properly/non-blocking (PR 158 below). They
>              do not include the major filter setaside/reinstate changes from
>              trunk, reluing on existing 2.4 c->data_in_{input,output}_filter
>              flags only.
> 
> 
> Modified:
>     httpd/httpd/branches/2.4.x/CHANGES
>     httpd/httpd/branches/2.4.x/STATUS
>     httpd/httpd/branches/2.4.x/docs/manual/mod/core.xml
>     httpd/httpd/branches/2.4.x/include/ap_mmn.h
>     httpd/httpd/branches/2.4.x/include/http_core.h
>     httpd/httpd/branches/2.4.x/modules/ssl/ssl_engine_io.c
>     httpd/httpd/branches/2.4.x/server/core.c
>     httpd/httpd/branches/2.4.x/server/core_filters.c
> 

> Modified: httpd/httpd/branches/2.4.x/server/core_filters.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/server/core_filters.c?rev=1885573&r1=1885572&r2=1885573&view=diff
> ==============================================================================
> --- httpd/httpd/branches/2.4.x/server/core_filters.c (original)
> +++ httpd/httpd/branches/2.4.x/server/core_filters.c Sat Jan 16 14:08:29 2021
> @@ -79,9 +79,10 @@ do { \
>  
>  struct core_output_filter_ctx {
>      apr_bucket_brigade *buffered_bb;
> -    apr_bucket_brigade *tmp_flush_bb;
>      apr_pool_t *deferred_write_pool;
>      apr_size_t bytes_written;
> +    struct iovec *vec;
> +    apr_size_t nvec;
>  };
>  
>  struct core_filter_ctx {
> @@ -335,50 +336,132 @@ static void setaside_remaining_output(ap
>  
>  static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
>                                               apr_bucket_brigade *bb,
> -                                             apr_size_t *bytes_written,
> +                                             core_output_filter_ctx_t *ctx,
>                                               conn_rec *c);
>  
> -static void remove_empty_buckets(apr_bucket_brigade *bb);
> -
> -static apr_status_t send_brigade_blocking(apr_socket_t *s,
> -                                          apr_bucket_brigade *bb,
> -                                          apr_size_t *bytes_written,
> -                                          conn_rec *c);
> -
>  static apr_status_t writev_nonblocking(apr_socket_t *s,
> -                                       struct iovec *vec, apr_size_t nvec,
>                                         apr_bucket_brigade *bb,
> -                                       apr_size_t *cumulative_bytes_written,
> +                                       core_output_filter_ctx_t *ctx,
> +                                       apr_size_t bytes_to_write,
> +                                       apr_size_t nvec,
>                                         conn_rec *c);
>  
>  #if APR_HAS_SENDFILE
>  static apr_status_t sendfile_nonblocking(apr_socket_t *s,
>                                           apr_bucket *bucket,
> -                                         apr_size_t 
> *cumulative_bytes_written,
> +                                         core_output_filter_ctx_t *ctx,
>                                           conn_rec *c);
>  #endif
>  
>  /* XXX: Should these be configurable parameters? */
>  #define THRESHOLD_MIN_WRITE 4096
> -#define THRESHOLD_MAX_BUFFER 65536
> -#define MAX_REQUESTS_IN_PIPELINE 5
>  
>  /* Optional function coming from mod_logio, used for logging of output
>   * traffic
>   */
>  extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
>  
> +static apr_status_t should_send_brigade(apr_bucket_brigade *bb,
> +                                        conn_rec *c, int *flush)
> +{
> +    core_server_config *conf =
> +        ap_get_core_module_config(c->base_server->module_config);
> +    apr_size_t total_bytes = 0, non_file_bytes = 0;
> +    apr_uint32_t eor_buckets = 0;
> +    apr_bucket *bucket;
> +    int need_flush = 0;
> +
> +    /* Scan through the brigade and decide whether we need to flush it,
> +     * based on the following rules:
> +     *
> +     *  a) The brigade contains a flush bucket: Do a blocking write
> +     *     of everything up that point.
> +     *
> +     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
> +     *     contains at least flush_max_threshold bytes in non-file
> +     *     buckets: Do blocking writes until the amount of data in the
> +     *     buffer is less than flush_max_threshold.  (The point of this
> +     *     rule is to provide flow control, in case a handler is
> +     *     streaming out lots of data faster than the data can be
> +     *     sent to the client.)
> +     *
> +     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
> +     *     contains at least flush_max_pipelined EOR buckets:
> +     *     Do blocking writes until less than flush_max_pipelined EOR
> +     *     buckets are left. (The point of this rule is to prevent too many
> +     *     FDs being kept open by pipelined requests, possibly allowing a
> +     *     DoS).
> +     *
> +     *  d) The brigade contains a morphing bucket: otherwise 
> ap_save_brigade()
> +     *     could read the whole bucket into memory.
> +     */
> +    for (bucket = APR_BRIGADE_FIRST(bb);
> +         bucket != APR_BRIGADE_SENTINEL(bb);
> +         bucket = APR_BUCKET_NEXT(bucket)) {
> +
> +        if (!APR_BUCKET_IS_METADATA(bucket)) {
> +            if (bucket->length == (apr_size_t)-1) {
> +                if (flush) {
> +                    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
> +                                  "core_output_filter: flushing because "
> +                                  "of morphing bucket");
> +                }
> +                need_flush = 1;
> +                break;
> +            }
> +
> +            total_bytes += bucket->length;
> +            if (!APR_BUCKET_IS_FILE(bucket)) {
> +                non_file_bytes += bucket->length;
> +                if (non_file_bytes > conf->flush_max_threshold) {
> +                    if (flush) {
> +                        ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
> +                                      "core_output_filter: flushing because "
> +                                      "of max threshold");
> +                    }
> +                    need_flush = 1;
> +                    break;
> +                }
> +            }
> +        }
> +        else if (APR_BUCKET_IS_FLUSH(bucket)) {
> +            if (flush) {
> +                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
> +                              "core_output_filter: flushing because "
> +                              "of FLUSH bucket");
> +            }
> +            need_flush = 1;
> +            break;
> +        }
> +        else if (AP_BUCKET_IS_EOR(bucket)
> +                 && conf->flush_max_pipelined >= 0
> +                 && ++eor_buckets > conf->flush_max_pipelined) {
> +            if (flush) {
> +                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
> +                              "core_output_filter: flushing because "
> +                              "of max pipelined");
> +            }
> +            need_flush = 1;
> +            break;
> +        }
> +    }
> +    if (flush) {
> +        *flush = need_flush;
> +    }
> +
> +    /* Also send if above flush_min_threshold, or if there are FILE buckets 
> */
> +    return (need_flush
> +            || total_bytes >= THRESHOLD_MIN_WRITE
> +            || total_bytes > non_file_bytes);
> +}
> +
>  apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade 
> *new_bb)
>  {
>      conn_rec *c = f->c;
>      core_net_rec *net = f->ctx;
>      core_output_filter_ctx_t *ctx = net->out_ctx;
>      apr_bucket_brigade *bb = NULL;
> -    apr_bucket *bucket, *next, *flush_upto = NULL;
> -    apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
> -    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
> -    apr_status_t rv;
> -    int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
> +    apr_status_t rv = APR_SUCCESS;
>  
>      /* Fail quickly if the connection has already been aborted. */
>      if (c->aborted) {
> @@ -392,12 +475,10 @@ apr_status_t ap_core_output_filter(ap_fi
>          ctx = apr_pcalloc(c->pool, sizeof(*ctx));
>          net->out_ctx = (core_output_filter_ctx_t *)ctx;
>          /*
> -         * Need to create tmp brigade with correct lifetime. Passing
> -         * NULL to apr_brigade_split_ex would result in a brigade
> +         * Need to create buffered_bb brigade with correct lifetime. Passing
> +         * NULL to ap_save_brigade() would result in a brigade
>           * allocated from bb->pool which might be wrong.
>           */
> -        ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
> -        /* same for buffered_bb and ap_save_brigade */
>          ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
>      }
>  
> @@ -412,191 +493,59 @@ apr_status_t ap_core_output_filter(ap_fi
>          else {
>              bb = ctx->buffered_bb;
>          }
> -        c->data_in_output_filters = 0;
>      }
>      else if (new_bb == NULL) {
> +        c->data_in_output_filters = 0;
>          return APR_SUCCESS;
>      }
>  
> -    /* Scan through the brigade and decide whether to attempt a write,
> -     * and how much to write, based on the following rules:
> -     *
> -     *  1) The new_bb is null: Do a nonblocking write of as much as
> -     *     possible: do a nonblocking write of as much data as possible,
> -     *     then save the rest in ctx->buffered_bb.  (If new_bb == NULL,
> -     *     it probably means that the MPM is doing asynchronous write
> -     *     completion and has just determined that this connection
> -     *     is writable.)
> -     *
> -     *  2) Determine if and up to which bucket we need to do a blocking
> -     *     write:
> -     *
> -     *  a) The brigade contains a flush bucket: Do a blocking write
> -     *     of everything up that point.
> -     *
> -     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
> -     *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
> -     *     buckets: Do blocking writes until the amount of data in the
> -     *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
> -     *     rule is to provide flow control, in case a handler is
> -     *     streaming out lots of data faster than the data can be
> -     *     sent to the client.)
> -     *
> -     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
> -     *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
> -     *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
> -     *     buckets are left. (The point of this rule is to prevent too many
> -     *     FDs being kept open by pipelined requests, possibly allowing a
> -     *     DoS).
> -     *
> -     *  d) The brigade contains a morphing bucket: If there was no other
> -     *     reason to do a blocking write yet, try reading the bucket. If its
> -     *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
> -     *     everything is fine. Otherwise we need to do a blocking write the
> -     *     up to and including the morphing bucket, because ap_save_brigade()
> -     *     would read the whole bucket into memory later on.
> -     *
> -     *  3) Actually do the blocking write up to the last bucket determined
> -     *     by rules 2a-d. The point of doing only one flush is to make as
> -     *     few calls to writev() as possible.
> -     *
> -     *  4) If the brigade contains at least THRESHOLD_MIN_WRITE
> -     *     bytes: Do a nonblocking write of as much data as possible,
> -     *     then save the rest in ctx->buffered_bb.
> -     */
> -
> -    if (new_bb == NULL) {
> -        rv = send_brigade_nonblocking(net->client_socket, bb,
> -                                      &(ctx->bytes_written), c);
> -        if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
> -            /* The client has aborted the connection */
> -            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
> -                          "core_output_filter: writing data to the network");
> -            apr_brigade_cleanup(bb);
> -            c->aborted = 1;
> -            return rv;
> -        }
> -        setaside_remaining_output(f, ctx, bb, c);
> -        return APR_SUCCESS;
> -    }
> -
> -    bytes_in_brigade = 0;
> -    non_file_bytes_in_brigade = 0;
> -    eor_buckets_in_brigade = 0;
> -    morphing_bucket_in_brigade = 0;
> -
> -    for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
> -         bucket = next) {
> -        next = APR_BUCKET_NEXT(bucket);
> -
> -        if (!APR_BUCKET_IS_METADATA(bucket)) {
> -            if (bucket->length == (apr_size_t)-1) {
> -                /*
> -                 * A setaside of morphing buckets would read everything into
> -                 * memory. Instead, we will flush everything up to and
> -                 * including this bucket.
> -                 */
> -                morphing_bucket_in_brigade = 1;
> +    if (!new_bb || should_send_brigade(bb, c, NULL)) {
> +        apr_socket_t *sock = net->client_socket;
> +        apr_interval_time_t sock_timeout = 0;
> +        int flush;
> +
> +        /* Non-blocking writes on the socket in any case. */
> +        apr_socket_timeout_get(sock, &sock_timeout);
> +        apr_socket_timeout_set(sock, 0);
> +
> +        do {
> +            rv = send_brigade_nonblocking(sock, bb, ctx, c);
> +            if (!new_bb || !APR_STATUS_IS_EAGAIN(rv)) {
> +                break;
>              }
> -            else {
> -                bytes_in_brigade += bucket->length;
> -                if (!APR_BUCKET_IS_FILE(bucket))
> -                    non_file_bytes_in_brigade += bucket->length;
> -            }
> -        }
> -        else if (AP_BUCKET_IS_EOR(bucket)) {
> -            eor_buckets_in_brigade++;
> -        }
> -
> -        if (APR_BUCKET_IS_FLUSH(bucket)
> -            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
> -            || morphing_bucket_in_brigade
> -            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
> -            /* this segment of the brigade MUST be sent before returning. */
> -
> -            if (loglevel >= APLOG_TRACE6) {
> -                char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
> -                               "FLUSH bucket" :
> -                               (non_file_bytes_in_brigade >= 
> THRESHOLD_MAX_BUFFER) ?
> -                               "THRESHOLD_MAX_BUFFER" :
> -                               morphing_bucket_in_brigade ? "morphing 
> bucket" :
> -                               "MAX_REQUESTS_IN_PIPELINE";
> -                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
> -                              "will flush because of %s", reason);
> -                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
> -                              "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
> -                              ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
> -                              "buckets: %d, morphing buckets: %d",
> -                              flush_upto == NULL ? " so far"
> -                                                 : " since last flush point",
> -                              bytes_in_brigade,
> -                              non_file_bytes_in_brigade,
> -                              eor_buckets_in_brigade,
> -                              morphing_bucket_in_brigade);
> +
> +            should_send_brigade(bb, c, &flush);
> +            if (flush) {
> +                apr_int32_t nfd;
> +                apr_pollfd_t pfd;
> +                memset(&pfd, 0, sizeof(pfd));
> +                pfd.reqevents = APR_POLLOUT;
> +                pfd.desc_type = APR_POLL_SOCKET;
> +                pfd.desc.s = sock;
> +                pfd.p = c->pool;
> +                do {
> +                    rv = apr_poll(&pfd, 1, &nfd, sock_timeout);
> +                } while (APR_STATUS_IS_EINTR(rv));
>              }
> -            /*
> -             * Defer the actual blocking write to avoid doing many writes.
> -             */
> -            flush_upto = next;
> +        } while (flush);

Hm, doesn't that loop forever in case the socket does not become writable again?
We don't check the result of the above poll call whether we had an event or if 
we hit the timeout.
Shouldn't we leave the outer while loop (the while(flush)) if apr_poll returns 
APR_TIMEUP?
Otherwise I assume that send_brigade_nonblocking will just return 
APR_STATUS_IS_EAGAIN.

Furthermore doesn't that open a way for a Dos if the client only reads a single 
byte shortly
before the timeout is hit? But I think we had the same problem with the old 
code and we would need
to have a mod_reqtimeout like check for a minimum rate and probably a maximum 
timeout.

>  
> -            bytes_in_brigade = 0;
> -            non_file_bytes_in_brigade = 0;
> -            eor_buckets_in_brigade = 0;
> -            morphing_bucket_in_brigade = 0;
> -        }
> +        /* Restore original socket timeout before leaving. */
> +        apr_socket_timeout_set(sock, sock_timeout);
>      }
>  

Regards

RĂ¼diger

Reply via email to