On 7/2/20 2:14 AM, [email protected] wrote:
> Author: ylavic
> Date: Thu Jul  2 00:14:26 2020
> New Revision: 1879419
> 
> URL: http://svn.apache.org/viewvc?rev=1879419&view=rev
> Log:
> mod_proxy_http: handle async tunneling of Upgrade(d) protocols.
> 
> When supported by the MPM (i.e. "event"), provide async callbacks and let
> them be scheduled by ap_mpm_register_poll_callback_timeout(), while the
> handler returns SUSPENDED.
> 
> The new ProxyAsyncDelay directive (if positive) enables async handling,
> while ProxyAsyncIdleTimeout determines the timeout applied on both ends
> while tunneling.
> 
> Github: closes #126
> 
> 
> Modified:
>     httpd/httpd/trunk/modules/proxy/mod_proxy.c
>     httpd/httpd/trunk/modules/proxy/mod_proxy.h
>     httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
> 

> Modified: httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy_http.c?rev=1879419&r1=1879418&r2=1879419&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/proxy/mod_proxy_http.c (original)
> +++ httpd/httpd/trunk/modules/proxy/mod_proxy_http.c Thu Jul  2 00:14:26 2020

> @@ -229,29 +233,129 @@ typedef enum {
>  typedef struct {
>      apr_pool_t *p;
>      request_rec *r;
> +    const char *proto;
>      proxy_worker *worker;
> +    proxy_dir_conf *dconf;
>      proxy_server_conf *sconf;
> -
>      char server_portstr[32];
> +
>      proxy_conn_rec *backend;
>      conn_rec *origin;
>  
>      apr_bucket_alloc_t *bucket_alloc;
>      apr_bucket_brigade *header_brigade;
>      apr_bucket_brigade *input_brigade;
> +
>      char *old_cl_val, *old_te_val;
>      apr_off_t cl_val;
>  
> +    proxy_http_state state;
>      rb_methods rb_method;
>  
> -    int force10;
>      const char *upgrade;
> -
> -    int expecting_100;
> -    unsigned int do_100_continue:1,
> -                 prefetch_nonblocking:1;
> +    proxy_tunnel_rec *tunnel;
> +    apr_array_header_t *pfds;
> +    apr_interval_time_t idle_timeout;
> +
> +    unsigned int can_go_async           :1,
> +                 expecting_100          :1,
> +                 do_100_continue        :1,
> +                 prefetch_nonblocking   :1,
> +                 force10                :1;
>  } proxy_http_req_t;
>  
> +static void proxy_http_async_finish(proxy_http_req_t *req)
> +{ 
> +    conn_rec *c = req->r->connection;
> +
> +    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> +                  "proxy %s: finish async", req->proto);
> +
> +    proxy_run_detach_backend(req->r, req->backend);
> +    ap_proxy_release_connection(req->proto, req->backend, req->r->server);
> +
> +    ap_finalize_request_protocol(req->r);
> +    ap_process_request_after_handler(req->r);
> +    /* don't touch req or req->r from here */
> +
> +    c->cs->state = CONN_STATE_LINGER;
> +    ap_mpm_resume_suspended(c);
> +}
> +
> +/* If neither socket becomes readable in the specified timeout,
> + * this callback will kill the request.
> + * We do not have to worry about having a cancel and a IO both queued.
> + */
> +static void proxy_http_async_cancel_cb(void *baton)
> +{ 
> +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
> +
> +    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> +                  "proxy %s: cancel async", req->proto);
> +
> +    req->r->connection->keepalive = AP_CONN_CLOSE;
> +    req->backend->close = 1;
> +    proxy_http_async_finish(req);
> +}
> +
> +/* Invoked by the event loop when data is ready on either end. 
> + * We don't need the invoke_mtx, since we never put multiple callback events
> + * in the queue.
> + */
> +static void proxy_http_async_cb(void *baton)
> +{ 
> +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
> +    int status;
> +
> +    if (req->pfds) {
> +        apr_pool_clear(req->pfds->pool);
> +    }
> +
> +    switch (req->state) {
> +    case PROXY_HTTP_TUNNELING:
> +        /* Pump both ends until they'd block and then start over again */
> +        status = ap_proxy_tunnel_run(req->tunnel);
> +        if (status == HTTP_GATEWAY_TIME_OUT) {
> +            if (req->pfds) {
> +                apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
> +                apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
> +                async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
> +                async_pfds[1].reqevents = tunnel_pfds[1].reqevents;

What is the purpose of this?
async_pfds and tunnel_pfds  are local to this block and cannot be used outside 
this block.

> +            }
> +            else {
> +                req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
> +                apr_pool_create(&req->pfds->pool, req->p);

Why first using baton->r->pool to create the copy and then setting the pool of 
the array to the new pool?

> +            }
> +            status = SUSPENDED;
> +        }
> +        break;
> +
> +    default:
> +        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r,
> +                      "proxy %s: unexpected async state (%i)",
> +                      req->proto, (int)req->state);
> +        status = HTTP_INTERNAL_SERVER_ERROR;
> +        break;
> +    }
> +
> +    if (status == SUSPENDED) {
> +        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> +                      "proxy %s: suspended, going async",
> +                      req->proto);
> +
> +        ap_mpm_register_poll_callback_timeout(req->pfds,
> +                                              proxy_http_async_cb, 
> +                                              proxy_http_async_cancel_cb, 
> +                                              req, req->idle_timeout);
> +    }
> +    else if (status != OK) {
> +        proxy_http_async_cancel_cb(req);
> +    }
> +    else {
> +        proxy_http_async_finish(req);
> +    }
> +}
> +
>  /* Read what's in the client pipe. If nonblocking is set and read is EAGAIN,
>   * pass a FLUSH bucket to the backend and read again in blocking mode.
>   */

Regards

RĂ¼diger

Reply via email to