Updated Branches: refs/heads/master e9cd43b8a -> e768cb61c
TS-1496: Enable per transaction flow control Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e768cb61 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e768cb61 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e768cb61 Branch: refs/heads/master Commit: e768cb61c335d8edafdc2cf17b4553cb490e49e4 Parents: e9cd43b Author: Alan M. Carroll <[email protected]> Authored: Fri Feb 22 19:54:11 2013 -0600 Committer: Alan M. Carroll <[email protected]> Committed: Wed Jun 12 13:55:49 2013 -0500 ---------------------------------------------------------------------- CHANGES | 2 + iocore/eventsystem/I_IOBuffer.h | 7 + iocore/eventsystem/P_IOBuffer.h | 15 ++ mgmt/RecordsConfig.cc | 6 + proxy/InkAPI.cc | 18 ++ proxy/InkAPITest.cc | 4 + proxy/Transform.cc | 27 ++- proxy/Transform.h | 28 +++ proxy/TransformInternal.h | 7 +- proxy/api/ts/ts.h.in | 13 +- proxy/http/HttpConfig.cc | 19 ++ proxy/http/HttpConfig.h | 5 +- proxy/http/HttpDebugNames.cc | 2 + proxy/http/HttpSM.cc | 102 ++++++++-- proxy/http/HttpSM.h | 6 + proxy/http/HttpTunnel.cc | 376 ++++++++++++++++++----------------- proxy/http/HttpTunnel.h | 150 +++++++++++++- 17 files changed, 572 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 065ab60..5f00fba 100644 --- a/CHANGES +++ b/CHANGES @@ -6,6 +6,8 @@ *) [TS-1942] Remove username.cache configs, they are obsolete and long gone. + *) [TS-1496] Enable per transaction flow control. + Changes with Apache Traffic Server 3.3.4 http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/iocore/eventsystem/I_IOBuffer.h ---------------------------------------------------------------------- diff --git a/iocore/eventsystem/I_IOBuffer.h b/iocore/eventsystem/I_IOBuffer.h index 5b44e17..baa1c54 100644 --- a/iocore/eventsystem/I_IOBuffer.h +++ b/iocore/eventsystem/I_IOBuffer.h @@ -578,6 +578,12 @@ public: */ int64_t read_avail(); + /** Check if there is more than @a size bytes available to read. + @return @c true if more than @a size byte are available. + */ + bool is_read_avail_more_than(int64_t size); + + /** Number of IOBufferBlocks with data in the block list. Returns the number of IOBufferBlocks on the block list with data remaining for @@ -1071,6 +1077,7 @@ public: return !_writer; } int64_t max_read_avail(); + int max_block_count(); void check_add_block(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/iocore/eventsystem/P_IOBuffer.h ---------------------------------------------------------------------- diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h index 98bdda2..261aa1f 100644 --- a/iocore/eventsystem/P_IOBuffer.h +++ b/iocore/eventsystem/P_IOBuffer.h @@ -633,6 +633,21 @@ IOBufferReader::read_avail() return t; } +inline bool +IOBufferReader::is_read_avail_more_than(int64_t size) +{ + int64_t t = -start_offset; + IOBufferBlock* b = block; + while (b) { + t += b->read_avail(); + if (t > size) { + return true; + } + b = b->next; + } + return false; +} + TS_INLINE void IOBufferReader::consume(int64_t n) { http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/mgmt/RecordsConfig.cc ---------------------------------------------------------------------- diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index 0ab0b94..fbc101a 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -429,6 +429,12 @@ RecordElement RecordsConfig[] = { , {RECT_CONFIG, "proxy.config.http.chunking.size", RECD_INT, "4096", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} , + {RECT_CONFIG, "proxy.config.http.flow_control.enabled", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.http.flow_control.high_water", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.http.flow_control.low_water", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} + , {RECT_CONFIG, "proxy.config.http.session_auth_cache_keep_alive_enabled", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} , // # Send http11 requests http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/InkAPI.cc ---------------------------------------------------------------------- diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc index 8004a1e..f598b47 100644 --- a/proxy/InkAPI.cc +++ b/proxy/InkAPI.cc @@ -7583,6 +7583,17 @@ _conf_to_memberp(TSOverridableConfigKey conf, HttpSM* sm, OverridableDataType *t typ = OVERRIDABLE_TYPE_INT; ret = &sm->t_state.txn_conf->http_chunking_size; break; + case TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED: + ret = &sm->t_state.txn_conf->flow_control_enabled; + break; + case TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK: + typ = OVERRIDABLE_TYPE_INT; + ret = &sm->t_state.txn_conf->flow_high_water_mark; + break; + case TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK: + typ = OVERRIDABLE_TYPE_INT; + ret = &sm->t_state.txn_conf->flow_low_water_mark; + break; // This helps avoiding compiler warnings, yet detect unhandled enum members. case TS_CONFIG_NULL: @@ -7816,6 +7827,9 @@ TSHttpTxnConfigFind(const char* name, int length, TSOverridableConfigKey *conf, case 'd': if (!strncmp(name, "proxy.config.http.server_tcp_init_cwnd", length)) cnf = TS_CONFIG_HTTP_SERVER_TCP_INIT_CWND; + else if (!strncmp(name, "proxy.config.http.flow_control.enabled", length)) + cnf = TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED; + break; break; case 's': if (!strncmp(name, "proxy.config.http.send_http11_requests", length)) @@ -7856,6 +7870,8 @@ TSHttpTxnConfigFind(const char* name, int length, TSOverridableConfigKey *conf, cnf = TS_CONFIG_URL_REMAP_PRISTINE_HOST_HDR; else if (!strncmp(name, "proxy.config.http.insert_request_via_str", length)) cnf = TS_CONFIG_HTTP_INSERT_REQUEST_VIA_STR; + else if (!strncmp(name, "proxy.config.http.flow_control.low_water", length)) + cnf = TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK; break; case 's': if (!strncmp(name, "proxy.config.http.origin_max_connections", length)) @@ -7887,6 +7903,8 @@ TSHttpTxnConfigFind(const char* name, int length, TSOverridableConfigKey *conf, case 'r': if (!strncmp(name, "proxy.config.http.insert_response_via_str", length)) cnf = TS_CONFIG_HTTP_INSERT_RESPONSE_VIA_STR; + else if (!strncmp(name, "proxy.config.http.flow_control.high_water", length)) + cnf = TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK; break; } break; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/InkAPITest.cc ---------------------------------------------------------------------- diff --git a/proxy/InkAPITest.cc b/proxy/InkAPITest.cc index 011e2d3..a1cdf8e 100644 --- a/proxy/InkAPITest.cc +++ b/proxy/InkAPITest.cc @@ -5599,6 +5599,7 @@ typedef enum ORIG_TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK, ORIG_TS_HTTP_PRE_REMAP_HOOK, ORIG_TS_HTTP_POST_REMAP_HOOK, + ORIG_TS_HTTP_RESPONSE_CLIENT_HOOK, ORIG_TS_HTTP_LAST_HOOK } ORIG_TSHttpHookID; @@ -7451,6 +7452,9 @@ const char *SDK_Overridable_Configs[TS_CONFIG_LAST_ENTRY] = { "proxy.config.net.sock_packet_tos_out", "proxy.config.http.insert_age_in_response", "proxy.config.http.chunking.size", + "proxy.config.http.flow_control.enabled", + "proxy.config.http.flow_control.low_water", + "proxy.config.http.flow_control.high_water" }; REGRESSION_TEST(SDK_API_OVERRIDABLE_CONFIGS) (RegressionTest * test, int atype, int *pstatus) http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/Transform.cc ---------------------------------------------------------------------- diff --git a/proxy/Transform.cc b/proxy/Transform.cc index 80f3dff..da3e51c 100644 --- a/proxy/Transform.cc +++ b/proxy/Transform.cc @@ -398,7 +398,7 @@ TransformTerminus::reenable(VIO *vio) -------------------------------------------------------------------------*/ TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks) -:VConnection(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0) +:TransformVCChain(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0) { INKVConnInternal *xform; @@ -506,6 +506,31 @@ TransformVConnection::reenable(VIO *vio) ink_assert(!"not reached"); } +/*------------------------------------------------------------------------- + -------------------------------------------------------------------------*/ + +uint64_t +TransformVConnection::backlog(uint64_t limit) +{ + uint64_t b = 0; // backlog + VConnection* raw_vc = m_transform; + MIOBuffer* w; + while (raw_vc && raw_vc != &m_terminus) { + INKVConnInternal* vc = static_cast<INKVConnInternal*>(raw_vc); + if (0 != (w = vc->m_read_vio.buffer.writer())) + b += w->max_read_avail(); + if (b >= limit) return b; + raw_vc = vc->m_output_vc; + } + if (0 != (w = m_terminus.m_read_vio.buffer.writer())) + b += w->max_read_avail(); + if (b >= limit) return b; + + IOBufferReader* r = m_terminus.m_write_vio.get_reader(); + if (r) + b += r->read_avail(); + return b; +} /*------------------------------------------------------------------------- -------------------------------------------------------------------------*/ http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/Transform.h ---------------------------------------------------------------------- diff --git a/proxy/Transform.h b/proxy/Transform.h index bbe6bab..26ac2be 100644 --- a/proxy/Transform.h +++ b/proxy/Transform.h @@ -60,6 +60,34 @@ public: }; #endif +/** A protocol class. + This provides transform VC specific methods for external access + without exposing internals or requiring extra includes. +*/ +class TransformVCChain : public VConnection +{ + protected: + /// Required constructor + TransformVCChain(ProxyMutex* m); + public: + /** Compute the backlog. This is the amount of data ready to read + for each element of the chain. If @a limit is non-negative then + the method will return as soon as the computed backlog is at + least that large. This provides for more efficient checking if + the caller is interested only in whether the backlog is at least + @a limit. The default is to accurately compute the backlog. + */ + virtual uint64_t backlog( + uint64_t limit = INTU64_MAX ///< Maximum value of interest + ) = 0; +}; + +inline +TransformVCChain::TransformVCChain(ProxyMutex* m) + : VConnection(m) +{ +} + /////////////////////////////////////////////////////////////////// /// RangeTransform implementation /// handling Range requests from clients http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/TransformInternal.h ---------------------------------------------------------------------- diff --git a/proxy/TransformInternal.h b/proxy/TransformInternal.h index 8b03529..873561e 100644 --- a/proxy/TransformInternal.h +++ b/proxy/TransformInternal.h @@ -59,7 +59,7 @@ public: }; -class TransformVConnection:public VConnection +class TransformVConnection:public TransformVCChain { public: TransformVConnection(Continuation * cont, APIHook * hooks); @@ -74,6 +74,11 @@ public: void reenable(VIO * vio); + /** Compute the backlog. + @return The actual backlog, or a value at least @a limit. + */ + virtual uint64_t backlog(uint64_t limit = INTU64_MAX); + public: VConnection * m_transform; Continuation *m_cont; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/api/ts/ts.h.in ---------------------------------------------------------------------- diff --git a/proxy/api/ts/ts.h.in b/proxy/api/ts/ts.h.in index 6741d79..c533068 100644 --- a/proxy/api/ts/ts.h.in +++ b/proxy/api/ts/ts.h.in @@ -210,8 +210,10 @@ extern "C" continuation for a particular hook are: TSHttpHookAdd: adds a global hook. You can globally add - any hook except for TS_HTTP_REQUEST_TRANSFORM_HOOK and - TS_HTTP_RESPONSE_TRANSFORM_HOOK. + any hook except for + - TS_HTTP_REQUEST_TRANSFORM_HOOK + - TS_HTTP_RESPONSE_TRANSFORM_HOOK + - TS_HTTP_RESPONSE_CLIENT_HOOK The following hooks can ONLY be added globally: - TS_HTTP_SELECT_ALT_HOOK @@ -228,6 +230,7 @@ extern "C" - TS_HTTP_SEND_RESPONSE_HDR_HOOK - TS_HTTP_REQUEST_TRANSFORM_HOOK - TS_HTTP_RESPONSE_TRANSFORM_HOOK + - TS_HTTP_RESPONSE_CLIENT_HOOK - TS_HTTP_TXN_START_HOOK - TS_HTTP_TXN_CLOSE_HOOK @@ -268,6 +271,7 @@ extern "C" TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK, TS_HTTP_PRE_REMAP_HOOK, TS_HTTP_POST_REMAP_HOOK, + TS_HTTP_RESPONSE_CLIENT_HOOK, TS_HTTP_LAST_HOOK } TSHttpHookID; #define TS_HTTP_READ_REQUEST_PRE_REMAP_HOOK TS_HTTP_PRE_REMAP_HOOK /* backwards compat */ @@ -612,7 +616,10 @@ extern "C" TS_CONFIG_NET_SOCK_PACKET_TOS_OUT, TS_CONFIG_HTTP_INSERT_AGE_IN_RESPONSE, TS_CONFIG_HTTP_CHUNKING_SIZE, - TS_CONFIG_LAST_ENTRY + TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED, + TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK, + TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK, + TS_CONFIG_LAST_ENTRY, } TSOverridableConfigKey; /* The TASK pool of threads is the primary method of off-loading continuations from the http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpConfig.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpConfig.cc b/proxy/http/HttpConfig.cc index 21f6fd0..a1f7361 100644 --- a/proxy/http/HttpConfig.cc +++ b/proxy/http/HttpConfig.cc @@ -1181,6 +1181,9 @@ HttpConfig::startup() HttpEstablishStaticConfigByte(c.oride.keep_alive_enabled_out, "proxy.config.http.keep_alive_enabled_out"); HttpEstablishStaticConfigByte(c.oride.chunking_enabled, "proxy.config.http.chunking_enabled"); HttpEstablishStaticConfigLongLong(c.oride.http_chunking_size, "proxy.config.http.chunking.size"); + HttpEstablishStaticConfigByte(c.oride.flow_control_enabled, "proxy.config.http.flow_control.enabled"); + HttpEstablishStaticConfigLongLong(c.oride.flow_high_water_mark, "proxy.config.http.flow_control.high_water"); + HttpEstablishStaticConfigLongLong(c.oride.flow_low_water_mark, "proxy.config.http.flow_control.low_water"); HttpEstablishStaticConfigByte(c.session_auth_cache_keep_alive_enabled, "proxy.config.http.session_auth_cache_keep_alive_enabled"); HttpEstablishStaticConfigLongLong(c.origin_server_pipeline, "proxy.config.http.origin_server_pipeline"); @@ -1456,6 +1459,22 @@ HttpConfig::reconfigure() params->oride.keep_alive_enabled_out = INT_TO_BOOL(m_master.oride.keep_alive_enabled_out); params->oride.chunking_enabled = INT_TO_BOOL(m_master.oride.chunking_enabled); params->oride.http_chunking_size = m_master.oride.http_chunking_size; + + params->oride.flow_control_enabled = INT_TO_BOOL(m_master.oride.flow_control_enabled); + params->oride.flow_high_water_mark = m_master.oride.flow_high_water_mark; + params->oride.flow_low_water_mark = m_master.oride.flow_low_water_mark; + // If not set (zero) then make values the same. + if (params->oride.flow_low_water_mark <= 0) + params->oride.flow_low_water_mark = params->oride.flow_high_water_mark; + if (params->oride.flow_high_water_mark <= 0) + params->oride.flow_high_water_mark = params->oride.flow_low_water_mark; + if (params->oride.flow_high_water_mark < params->oride.flow_low_water_mark) { + Warning("Flow control low water mark is greater than high water mark, flow control disabled"); + params->oride.flow_control_enabled = 0; + // zero means "hardwired default" when actually used. + params->oride.flow_high_water_mark = params->oride.flow_low_water_mark = 0; + } + params->session_auth_cache_keep_alive_enabled = INT_TO_BOOL(m_master.session_auth_cache_keep_alive_enabled); params->origin_server_pipeline = m_master.origin_server_pipeline; params->user_agent_pipeline = m_master.user_agent_pipeline; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpConfig.h ---------------------------------------------------------------------- diff --git a/proxy/http/HttpConfig.h b/proxy/http/HttpConfig.h index 024154d..22e2c92 100644 --- a/proxy/http/HttpConfig.h +++ b/proxy/http/HttpConfig.h @@ -431,7 +431,7 @@ struct OverridableHttpConfigParams { freshness_fuzz_time(0), freshness_fuzz_min_time(0), max_cache_open_read_retries(0), cache_open_read_retry_time(0), background_fill_active_timeout(0), - http_chunking_size(0), + http_chunking_size(0), flow_high_water_mark(0), flow_low_water_mark(0), // Strings / floats must come last proxy_response_server_string(NULL), proxy_response_server_string_len(0), @@ -506,6 +506,7 @@ struct OverridableHttpConfigParams { // DOC IN CACHE NO DNS// ////////////////////// MgmtByte doc_in_cache_skip_dns; + MgmtByte flow_control_enabled; MgmtInt negative_caching_lifetime; @@ -567,6 +568,8 @@ struct OverridableHttpConfigParams { MgmtInt background_fill_active_timeout; MgmtInt http_chunking_size; // Maximum chunk size for chunked output. + MgmtInt flow_high_water_mark; ///< Flow control high water mark. + MgmtInt flow_low_water_mark; ///< Flow control low water mark. // IMPORTANT: Here comes all strings / floats configs. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpDebugNames.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpDebugNames.cc b/proxy/http/HttpDebugNames.cc index 3703283..c296dde 100644 --- a/proxy/http/HttpDebugNames.cc +++ b/proxy/http/HttpDebugNames.cc @@ -486,6 +486,8 @@ HttpDebugNames::get_api_hook_name(TSHttpHookID t) return "TS_HTTP_PRE_REMAP_HOOK"; case TS_HTTP_POST_REMAP_HOOK: return "TS_HTTP_POST_REMAP_HOOK"; + case TS_HTTP_RESPONSE_CLIENT_HOOK: + return "TS_HTTP_RESPONSE_CLIENT_HOOK"; case TS_HTTP_LAST_HOOK: return "TS_HTTP_LAST_HOOK"; } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpSM.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index 38a489f..8035b38 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -319,7 +319,8 @@ HttpSM::HttpSM() ua_raw_buffer_reader(NULL), server_entry(NULL), server_session(NULL), shared_session_retries(0), server_buffer_reader(NULL), - transform_info(), post_transform_info(), second_cache_sm(NULL), + transform_info(), post_transform_info(), has_active_plugin_agents(false), + second_cache_sm(NULL), default_handler(NULL), pending_action(NULL), historical_action(NULL), last_action(HttpTransact::STATE_MACHINE_ACTION_UNDEFINED), client_request_hdr_bytes(0), client_request_body_bytes(0), @@ -2966,9 +2967,10 @@ HttpSM::is_bg_fill_necessary(HttpTunnelConsumer * c) // There must be another consumer for it to worthwhile to // set up a background fill - if (c->producer->num_consumers > 1 && - (c->producer->vc_type == HT_HTTP_SERVER || c->producer->vc_type == HT_TRANSFORM) && + if (((c->producer->num_consumers > 1 && c->producer->vc_type == HT_HTTP_SERVER) || + (c->producer->num_consumers > 1 && c->producer->vc_type == HT_TRANSFORM)) && c->producer->alive == true) { + // If threshold is 0.0 or negative then do background // fill regardless of the content length. Since this // is floating point just make sure the number is near zero @@ -2988,7 +2990,7 @@ HttpSM::is_bg_fill_necessary(HttpTunnelConsumer * c) if (pDone <= 1.0 && pDone > t_state.txn_conf->background_fill_threshold) { return true; } else { - DebugSM("http", "[%" PRId64 "] no background. Only %%%f done", sm_id, pDone); + DebugSM("http", "[%" PRId64 "] no background. Only %%%f of %%%f done [%" PRId64 " / %" PRId64" ]", sm_id, pDone, t_state.txn_conf->background_fill_threshold, ua_body_done, ua_cl); } } @@ -3027,8 +3029,8 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer * c) // There is another consumer (cache write) so // detach the user agent - ink_assert(server_entry->vc == c->producer->vc); - ink_assert(server_session == c->producer->vc); + ink_assert(server_entry->vc == server_session); + ink_assert(c->is_downstream_from(server_session)); server_session->get_netvc()-> set_active_timeout(HRTIME_SECONDS(t_state.txn_conf->background_fill_active_timeout)); } else { @@ -3690,6 +3692,35 @@ HttpSM::tunnel_handler_transform_read(int event, HttpTunnelProducer * p) } int +HttpSM::tunnel_handler_plugin_agent(int event, HttpTunnelConsumer * c) +{ + STATE_ENTER(&HttpSM::tunnel_handler_plugin_client, event); + + switch (event) { + case VC_EVENT_ERROR: + c->vc->do_io_close(EHTTP_ERROR); // close up + // Signal producer if we're the last consumer. + if (c->producer->alive && c->producer->num_consumers == 1) { + tunnel.producer_handler(HTTP_TUNNEL_EVENT_CONSUMER_DETACH, c->producer); + } + break; + case VC_EVENT_EOS: + if (c->producer->alive && c->producer->num_consumers == 1) { + tunnel.producer_handler(HTTP_TUNNEL_EVENT_CONSUMER_DETACH, c->producer); + } + // FALLTHROUGH + case VC_EVENT_WRITE_COMPLETE: + c->write_success = true; + c->vc->do_io(VIO::CLOSE); + break; + default: + ink_release_assert(0); + } + + return 0; +} + +int HttpSM::state_srv_lookup(int event, void *data) { STATE_ENTER(&HttpSM::state_srv_lookup, event); @@ -4285,7 +4316,7 @@ HttpSM::do_cache_prepare_write() inline void HttpSM::do_cache_prepare_write_transform() { - if (cache_sm.cache_write_vc != NULL || tunnel.is_there_cache_write()) + if (cache_sm.cache_write_vc != NULL || tunnel.has_cache_writer()) do_cache_prepare_action(&transform_cache_sm, NULL, false, true); else do_cache_prepare_action(&transform_cache_sm, NULL, false); @@ -4370,7 +4401,6 @@ HttpSM::do_cache_prepare_action(HttpCacheSM * c_sm, CacheHTTPInfo * object_read_ } } - ////////////////////////////////////////////////////////////////////////// // // HttpSM::do_http_server_open() @@ -4566,7 +4596,6 @@ HttpSM::do_http_server_open(bool raw) } else if (ua_session->f_outbound_transparent) { opt.addr_binding = NetVCOptions::FOREIGN_ADDR; opt.local_ip = t_state.client_info.addr; - /* If the connection is server side transparent, we can bind to the port that the client chose instead of randomly assigning one at the proxy. This is controlled by the 'use_client_source_port' @@ -5116,8 +5145,7 @@ HttpSM::setup_transform_to_server_transfer() &HttpSM::tunnel_handler_transform_read, HT_TRANSFORM, "post transform"); - p->self_consumer = c; - c->self_producer = p; + tunnel.chain(c,p); post_transform_info.entry->in_tunnel = true; tunnel.add_consumer(server_entry->vc, @@ -6000,14 +6028,15 @@ HttpSM::setup_transfer_from_transform() &HttpSM::tunnel_handler_transform_read, HT_TRANSFORM, "transform read"); - p->self_consumer = c; - c->self_producer = p; + tunnel.chain(c, p); tunnel.add_consumer(ua_entry->vc, transform_info.vc, &HttpSM::tunnel_handler_ua, HT_HTTP_CLIENT, "user agent"); transform_info.entry->in_tunnel = true; ua_entry->in_tunnel = true; + this->setup_plugin_agents(p); + if ( t_state.client_info.receive_chunked_response ) { tunnel.set_producer_chunking_action(p, client_response_hdr_bytes, TCA_CHUNK_CONTENT); tunnel.set_producer_chunking_size(p, t_state.txn_conf->http_chunking_size); @@ -6037,8 +6066,7 @@ HttpSM::setup_transfer_from_transform_to_cache_only() &HttpSM::tunnel_handler_transform_read, HT_TRANSFORM, "transform read"); - p->self_consumer = c; - c->self_producer = p; + tunnel.chain(c, p); transform_info.entry->in_tunnel = true; @@ -6108,7 +6136,10 @@ HttpSM::setup_server_transfer() action = TCA_PASSTHRU_DECHUNKED_CONTENT; } else { if (t_state.current.server->transfer_encoding != HttpTransact::CHUNKED_ENCODING) - action = TCA_CHUNK_CONTENT; + if (t_state.client_info.http_version == HTTPVersion(0, 9)) + action = TCA_PASSTHRU_DECHUNKED_CONTENT; // send as-is + else + action = TCA_CHUNK_CONTENT; else action = TCA_PASSTHRU_CHUNKED_CONTENT; } @@ -6141,6 +6172,8 @@ HttpSM::setup_server_transfer() ua_entry->in_tunnel = true; server_entry->in_tunnel = true; + this->setup_plugin_agents(p); + // If the incoming server response is chunked and the client does not // expect a chunked response, then dechunk it. Otherwise, if the // incoming response is not chunked and the client expects a chunked @@ -6257,10 +6290,8 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr) &HttpSM::tunnel_handler_ssl_consumer, HT_HTTP_SERVER, "http server - tunnel"); // Make the tunnel aware that the entries are bi-directional - p_os->self_consumer = c_os; - p_ua->self_consumer = c_ua; - c_ua->self_producer = p_ua; - c_os->self_producer = p_os; + tunnel.chain(c_os, p_os); + tunnel.chain(c_ua, p_ua); ua_entry->in_tunnel = true; server_entry->in_tunnel = true; @@ -6268,6 +6299,20 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr) tunnel.tunnel_run(); } +void +HttpSM::setup_plugin_agents(HttpTunnelProducer* p) +{ + APIHook* agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK); + has_active_plugin_agents = agent != 0; + while (agent) { + INKVConnInternal* contp = static_cast<INKVConnInternal*>(agent->m_cont); + tunnel.add_consumer(contp, p->vc, &HttpSM::tunnel_handler_plugin_agent, HT_HTTP_CLIENT, "plugin agent"); + // We don't put these in the SM VC table because the tunnel + // will clean them up in do_io_close(). + agent = agent->next(); + } +} + inline void HttpSM::transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info) { @@ -6281,6 +6326,22 @@ HttpSM::transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info) } } +void +HttpSM::plugin_agents_cleanup() +{ + // If this is set then all of the plugin agent VCs were put in + // the VC table and cleaned up there. This handles the case where + // something went wrong early. + if (!has_active_plugin_agents) { + APIHook* agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK); + while (agent) { + INKVConnInternal* contp = static_cast<INKVConnInternal*>(agent->m_cont); + contp->do_io_close(); + agent = agent->next(); + } + } +} + ////////////////////////////////////////////////////////////////////////// // // HttpSM::kill_this() @@ -6326,6 +6387,7 @@ HttpSM::kill_this() if (hooks_set) { transform_cleanup(TS_HTTP_RESPONSE_TRANSFORM_HOOK, &transform_info); transform_cleanup(TS_HTTP_REQUEST_TRANSFORM_HOOK, &post_transform_info); + plugin_agents_cleanup(); } // It's also possible that the plugin_tunnel vc was never // executed due to not contacting the server http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpSM.h ---------------------------------------------------------------------- diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h index 81e1e07..d05fb4d 100644 --- a/proxy/http/HttpSM.h +++ b/proxy/http/HttpSM.h @@ -322,6 +322,9 @@ protected: HttpTransformInfo transform_info; HttpTransformInfo post_transform_info; + /// Set if plugin client / user agents are active. + /// Need primarily for cleanup. + bool has_active_plugin_agents; HttpCacheSM cache_sm; HttpCacheSM transform_cache_sm; @@ -397,6 +400,7 @@ protected: int tunnel_handler_ssl_consumer(int event, HttpTunnelConsumer * p); int tunnel_handler_transform_write(int event, HttpTunnelConsumer * c); int tunnel_handler_transform_read(int event, HttpTunnelProducer * p); + int tunnel_handler_plugin_agent(int event, HttpTunnelConsumer * c); void do_hostdb_lookup(); void do_hostdb_reverse_lookup(); @@ -458,6 +462,7 @@ protected: HttpTunnelProducer *setup_transfer_from_transform(); HttpTunnelProducer *setup_cache_transfer_to_transform(); HttpTunnelProducer *setup_transfer_from_transform_to_cache_only(); + void setup_plugin_agents(HttpTunnelProducer* p); HttpTransact::StateMachineAction_t last_action; int (HttpSM::*m_last_state) (int event, void *data); @@ -517,6 +522,7 @@ protected: void update_stats(); void transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info); bool is_transparent_passthrough_allowed(); + void plugin_agents_cleanup(); public: LINK(HttpSM, debug_link); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpTunnel.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpTunnel.cc b/proxy/http/HttpTunnel.cc index d527a30..8e8ba53 100644 --- a/proxy/http/HttpTunnel.cc +++ b/proxy/http/HttpTunnel.cc @@ -37,8 +37,6 @@ #include "HttpDebugNames.h" #include "ParseRules.h" - -static const int max_chunked_ahead_bytes = 1 << 15; static const int max_chunked_ahead_blocks = 128; static const int min_block_transfer_bytes = 256; static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n"; @@ -47,81 +45,18 @@ static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n"; // a block in the input stream. static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE; -static void -chunked_reenable(HttpTunnelProducer * p, HttpTunnel * tunnel) -{ - - // FIX ME: still need to deal with huge chunk sizes. If a chunk - // is 1GB, we will currently buffer the whole thing - - if (p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) { - p->read_vio->reenable(); - } else { - // If we are in are in the flow control, there's data in - // the incoming buffer that we haven't processed yet - // Only process it if we determine the client isn't overflowed - MIOBuffer *dbuf = p->chunked_handler.dechunked_buffer; - - if (dbuf->max_read_avail() < max_chunked_ahead_bytes && dbuf->max_block_count() < max_chunked_ahead_blocks) { - // Flow control no longer needed. We only initiate flow control - // after completing a chunk so we know the next state is - // CHUNK_READ_SIZE_START - Debug("http_chunk_flow", "Suspending flow control"); - p->chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START; - - // Call back the tunnel as if we've received more data from the server - int r = tunnel->main_handler(p->chunked_handler.last_server_event, p->read_vio); - - // Only actually reenable the server if we've stayed out of the - // flow control state. The callout may have killed the vc - // and/or the vio so check that the producer is still alive - // (INKqa05512) - // Also, make sure the tunnel has not been deallocated on - // the call to tunnel->main_handler - if (r == EVENT_CONT && p->alive && p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) { - // INKqa05737 - since we explicitly disabled the vc by setting - // nbytes = ndone when going into flow control, we need - // set nbytes up again here - p->read_vio->nbytes = INT64_MAX; - p->read_vio->reenable(); - } - } else { - Debug("http_chunk_flow", "Blocking reenable - flow control in effect"); - } - } -} - -static void -add_chunked_reenable(HttpTunnelProducer * p, HttpTunnel * tunnel) -{ - if (p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) { - p->read_vio->reenable(); - } else { - // If we are in are in the flow control, there's data in - // the incoming buffer that we haven't processed yet - // Only process it if we determine the client isn't overflowed - MIOBuffer *cbuf = p->chunked_handler.chunked_buffer; - if (cbuf->max_read_avail() < max_chunked_ahead_bytes && cbuf->max_block_count() < max_chunked_ahead_blocks) { - // Flow control no longer needed. - Debug("http_chunk_flow", "Suspending flow control on enchunking"); - p->chunked_handler.state = ChunkedHandler::CHUNK_WRITE_CHUNK; - - // Call back the tunnel as if we've received more data from - // the server - int r = tunnel->main_handler(p->chunked_handler.last_server_event, p->read_vio); - - // Only actually reenable the server if we've stayed out of the - // flow control state. The callout may have killed the vc - // and/or the vio so check that the producer is still alive - // Also, make sure the tunnel has not been deallocated on - // the call to tunnel->main_handler - if (r == EVENT_CONT && p->alive && p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) { - p->read_vio->reenable(); - } - } else { - Debug("http_chunk_flow", "Blocking reenable on enchunking - flow control in effect"); - } +char +VcTypeCode(HttpTunnelType_t t) { + char zret = ' '; + switch (t) { + case HT_HTTP_CLIENT: zret = 'U'; break; + case HT_HTTP_SERVER: zret = 'S'; break; + case HT_TRANSFORM: zret = 'T'; break; + case HT_CACHE_READ: zret = 'R'; break; + case HT_CACHE_WRITE: zret = 'W'; break; + default: break; } + return zret; } ChunkedHandler::ChunkedHandler() @@ -281,15 +216,7 @@ ChunkedHandler::read_chunk() if (bytes_left == 0) { Debug("http_chunk", "completed read of chunk of %" PRId64" bytes", cur_chunk_size); - // Check to see if we need to flow control the output - if (dechunked_buffer && - (dechunked_buffer->max_read_avail() > max_chunked_ahead_bytes || - dechunked_buffer->max_block_count() > max_chunked_ahead_blocks)) { - state = CHUNK_FLOW_CONTROL; - Debug("http_chunk_flow", "initiating flow control pause"); - } else { - state = CHUNK_READ_SIZE_START; - } + state = CHUNK_READ_SIZE_START; } else if (bytes_left > 0) { Debug("http_chunk", "read %" PRId64" bytes of an %" PRId64" chunk", b, cur_chunk_size); } @@ -301,7 +228,7 @@ ChunkedHandler::read_trailer() int64_t bytes_used; bool done = false; - while (chunked_reader->read_avail() > 0 && !done) { + while (chunked_reader->is_read_avail_more_than(0) && !done) { const char *tmp = chunked_reader->start(); int64_t data_size = chunked_reader->block_read_avail(); @@ -341,7 +268,7 @@ ChunkedHandler::read_trailer() bool ChunkedHandler::process_chunked_content() { - while (chunked_reader->read_avail() > 0 && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) { + while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) { switch (state) { case CHUNK_READ_SIZE: case CHUNK_READ_SIZE_CRLF: @@ -385,48 +312,36 @@ bool ChunkedHandler::generate_chunked_content() while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) { int64_t write_val = MIN(max_chunk_size, r_avail); - // If the server is still alive, check to see if too much data is - // pilling up on the client's buffer. If the server is done, ignore - // the flow control rules so that we don't have to bother with stopping - // the io an coming a back and dealing with the server's data later - if (server_done == false && - (chunked_buffer->max_read_avail() > max_chunked_ahead_bytes || - chunked_buffer->max_block_count() > max_chunked_ahead_blocks)) { - state = CHUNK_FLOW_CONTROL; - Debug("http_chunk_flow", "initiating flow control pause on enchunking"); - return false; - } else { - state = CHUNK_WRITE_CHUNK; - Debug("http_chunk", "creating a chunk of size %" PRId64" bytes", write_val); - - // Output the chunk size. - if (write_val != max_chunk_size) { - int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val); - chunked_buffer->write(tmp, len); - chunked_size += len; - } else { - chunked_buffer->write(max_chunk_header, max_chunk_header_len); - chunked_size += max_chunk_header_len; - } + state = CHUNK_WRITE_CHUNK; + Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val); - // Output the chunk itself. - // - // BZ# 54395 Note - we really should only do a - // block transfer if there is sizable amount of - // data (like we do for the case where we are - // removing chunked encoding in ChunkedHandler::transfer_bytes() - // However, I want to do this fix with as small a risk - // as possible so I'm leaving this issue alone for - // now - // - chunked_buffer->write(dechunked_reader, write_val); - chunked_size += write_val; - dechunked_reader->consume(write_val); - - // Output the trailing CRLF. - chunked_buffer->write("\r\n", 2); - chunked_size += 2; + // Output the chunk size. + if (write_val != max_chunk_size) { + int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val); + chunked_buffer->write(tmp, len); + chunked_size += len; + } else { + chunked_buffer->write(max_chunk_header, max_chunk_header_len); + chunked_size += max_chunk_header_len; } + + // Output the chunk itself. + // + // BZ# 54395 Note - we really should only do a + // block transfer if there is sizable amount of + // data (like we do for the case where we are + // removing chunked encoding in ChunkedHandler::transfer_bytes() + // However, I want to do this fix with as small a risk + // as possible so I'm leaving this issue alone for + // now + // + chunked_buffer->write(dechunked_reader, write_val); + chunked_size += write_val; + dechunked_reader->consume(write_val); + + // Output the trailing CRLF. + chunked_buffer->write("\r\n", 2); + chunked_size += 2; } if (server_done) { @@ -445,11 +360,67 @@ HttpTunnelProducer::HttpTunnelProducer() vc(NULL), vc_handler(NULL), read_vio(NULL), read_buffer(NULL), buffer_start(NULL), vc_type(HT_HTTP_SERVER), chunking_action(TCA_PASSTHRU_DECHUNKED_CONTENT), do_chunking(false), do_dechunking(false), do_chunked_passthru(false), - init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0), handler_state(0), num_consumers(0), alive(false), - read_success(false), name(NULL) + init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0), + handler_state(0), num_consumers(0), alive(false), + read_success(false), flow_control_source(0), name(NULL) { } +uint64_t +HttpTunnelProducer::backlog(uint64_t limit) { + uint64_t zret = 0; + // Calculate the total backlog, the # of bytes inside ATS for this producer. + // We go all the way through each chain to the ending sink and take the maximum + // over those paths. Do need to be careful about loops which can occur. + for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) { + if (c->alive && c->write_vio) { + uint64_t n = 0; + if (HT_TRANSFORM == c->vc_type) { + n += static_cast<TransformVCChain*>(c->vc)->backlog(limit); + } else { + IOBufferReader* r = c->write_vio->get_reader(); + if (r) { + n += static_cast<uint64_t>(r->read_avail()); + } + } + if (n >= limit) return n; + + if (!c->is_sink()) { + HttpTunnelProducer* dsp = c->self_producer; + if (dsp) { + n += dsp->backlog(); + } + } + if (n >= limit) return n; + if (n > zret) zret = n; + } + } + + if (chunked_handler.chunked_reader) { + zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail()); + } + + return zret; +} + +/* We set the producers in a flow chain specifically rather than + using a tunnel level variable in order to handle bi-directional + tunnels correctly. In such a case the flow control on producers is + not related so a single value for the tunnel won't work. +*/ +void +HttpTunnelProducer::set_throttle_src(HttpTunnelProducer* srcp) { + HttpTunnelProducer* p = this; + p->flow_control_source = srcp; + for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) { + if (!c->is_sink()) { + p = c->self_producer; + if (p) + p->set_throttle_src(srcp); + } + } +} + HttpTunnelConsumer::HttpTunnelConsumer() : link(), producer(NULL), self_producer(NULL), vc_type(HT_HTTP_CLIENT), vc(NULL), buffer_reader(NULL), vc_handler(NULL), write_vio(NULL), skip_bytes(0), bytes_written(0), handler_state(0), alive(false), @@ -463,14 +434,31 @@ HttpTunnel::HttpTunnel() } void +HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex) +{ + HttpConfigParams* params = sm_arg->t_state.http_config_param; + sm = sm_arg; + active = false; + mutex = amutex; + SET_HANDLER(&HttpTunnel::main_handler); + flow_state.enabled_p = params->oride.flow_control_enabled; + if (params->oride.flow_low_water_mark > 0) + flow_state.low_water = params->oride.flow_low_water_mark; + if (params->oride.flow_high_water_mark > 0) + flow_state.high_water = params->oride.flow_high_water_mark; + // This should always be true, we handled default cases back in HttpConfig::reconfigure() + ink_assert(flow_state.low_water <= flow_state.high_water); +} + +void HttpTunnel::reset() { ink_assert(active == false); #ifdef DEBUG - for (int i = 0; i < MAX_PRODUCERS; i++) { + for (int i = 0; i < MAX_PRODUCERS; ++i) { ink_assert(producers[i].alive == false); } - for (int j = 0; j < MAX_CONSUMERS; j++) { + for (int j = 0; j < MAX_CONSUMERS; ++j) { ink_assert(consumers[j].alive == false); } #endif @@ -484,7 +472,7 @@ HttpTunnel::reset() void HttpTunnel::kill_tunnel() { - for (int i = 0; i < MAX_PRODUCERS; i++) { + for (int i = 0; i < MAX_PRODUCERS; ++i) { if (producers[i].vc != NULL) { chain_abort_all(&producers[i]); } @@ -499,7 +487,7 @@ HttpTunnel::kill_tunnel() HttpTunnelProducer * HttpTunnel::alloc_producer() { - for (int i = 0; i < MAX_PRODUCERS; i++) { + for (int i = 0; i < MAX_PRODUCERS; ++i) { if (producers[i].vc == NULL) { num_producers++; ink_assert(num_producers <= MAX_PRODUCERS); @@ -529,7 +517,7 @@ HttpTunnel::deallocate_buffers() { int num = 0; ink_release_assert(active == false); - for (int i = 0; i < MAX_PRODUCERS; i++) { + for (int i = 0; i < MAX_PRODUCERS; ++i) { if (producers[i].read_buffer != NULL) { ink_assert(producers[i].vc != NULL); free_MIOBuffer(producers[i].read_buffer); @@ -678,6 +666,16 @@ HttpTunnel::add_consumer(VConnection * vc, return c; } +void +HttpTunnel::chain(HttpTunnelConsumer* c, HttpTunnelProducer* p) +{ + p->self_consumer = c; + c->self_producer = p; + // If the flow is already throttled update the chained producer. + if (c->producer->is_throttled()) + p->set_throttle_src(c->producer->flow_control_source); +} + // void HttpTunnel::tunnel_run() // // Makes the tunnel go @@ -694,7 +692,7 @@ HttpTunnel::tunnel_run(HttpTunnelProducer * p_arg) ink_assert(active == false); - for (int i = 0; i < MAX_PRODUCERS; i++) { + for (int i = 0 ; i < MAX_PRODUCERS ; ++i) { p = producers + i; if (p->vc != NULL) { producer_run(p); @@ -952,7 +950,6 @@ HttpTunnel::producer_run(HttpTunnelProducer * p) p->buffer_start = NULL; } - int HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p) { @@ -966,7 +963,8 @@ HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p) case VC_EVENT_READ_COMPLETE: case HTTP_TUNNEL_EVENT_PRECOMPLETE: case VC_EVENT_EOS: - p->chunked_handler.last_server_event = event; + p->last_event = + p->chunked_handler.last_server_event = event; // TODO: Should we check the return code? p->chunked_handler.generate_chunked_content(); break; @@ -1001,7 +999,8 @@ HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p) return event; } - p->chunked_handler.last_server_event = event; + p->last_event = + p->chunked_handler.last_server_event = event; bool done = p->chunked_handler.process_chunked_content(); // If we couldn't understand the encoding, return @@ -1014,18 +1013,6 @@ HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p) // sense but no reenables follow return VC_EVENT_EOS; } - // If we are in a flow control state, there is still data in - // buffer so return READ_READY - if (p->read_vio && p->chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) { - // INKqa05737 - We need force the server vc to - // disabled since the server may have sent the - // last chunk. When we go to process that last chunk, - // we will move the server to a keep alive state. Since - // we are prohibited from changing the buffer, we need - // make sure the iocore doesn't schedule a read - p->read_vio->nbytes = p->read_vio->ndone; - return VC_EVENT_READ_READY; - } switch (event) { case VC_EVENT_READ_READY: @@ -1077,6 +1064,8 @@ bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p) } } else if (p->do_dechunking || p->do_chunked_passthru) { event = producer_handler_chunked(event, p); + } else { + p->last_event = event; } //YTS Team, yamsat Plugin @@ -1185,6 +1174,53 @@ bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p) return sm_callback; } +bool +HttpTunnel::consumer_reenable(HttpTunnelConsumer* c) +{ + HttpTunnelProducer* p = c->producer; + HttpTunnelProducer* srcp = p->flow_control_source; + if (p->alive +#ifndef LAZY_BUF_ALLOC + && p->read_buffer->write_avail() > 0 +#endif + ) { + // Only do flow control if enabled and the producer is an external + // source. Otherwise disable by making the backlog zero. Because + // the backlog short cuts quit when the value is equal (or + // greater) to the target, we use strict comparison only for + // checking low water, otherwise the flow control can stall out. + uint64_t backlog = (flow_state.enabled_p && p->is_source()) + ? p->backlog(flow_state.high_water) + : 0; + + if (backlog >= flow_state.high_water) { + if (is_debug_tag_set("http_tunnel")) + Debug("http_tunnel", "Throttle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog()); + p->throttle(); // p becomes srcp for future calls to this method + } else { + if (srcp && c->is_sink()) { + // Check if backlog is below low water - note we need to check + // against the source producer, not necessarily the producer + // for this consumer. We don't have to recompute the backlog + // if they are the same because we know low water <= high + // water so the value is sufficiently accurate. + if (srcp != p) + backlog = srcp->backlog(flow_state.low_water); + if (backlog < flow_state.low_water) { + if (is_debug_tag_set("http_tunnel")) + Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog()); + srcp->unthrottle(); + srcp->read_vio->reenable(); + // Kick source producer to get flow ... well, flowing. + this->producer_handler(VC_EVENT_READ_READY, srcp); + } + } + p->read_vio->reenable(); + } + } + return p->is_throttled(); +} + // // bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p) // @@ -1200,6 +1236,7 @@ bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c) { bool sm_callback = false; HttpConsumerHandler jump_point; + HttpTunnelProducer* p = c->producer; Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event)); @@ -1207,31 +1244,7 @@ bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c) switch (event) { case VC_EVENT_WRITE_READY: - // Data consumed, reenable producer - if (c->producer->alive) { - if (c->producer->do_dechunking) { - // Because dechunking decouples the inbound and outbound - // buffers, we have to run special code handle the - // reenable - chunked_reenable(c->producer, this); - } else if (c->producer->do_chunking) { - add_chunked_reenable(c->producer, this); - } else { - /* - * Dont check for space availability. The - * net code adds more space if required. - */ - -#ifndef LAZY_BUF_ALLOC - if (c->producer->read_buffer->write_avail() > 0) { - c->producer->read_vio->reenable(); - } -#else - c->producer->read_vio->reenable(); -#endif - - } - } + this->consumer_reenable(c); break; case VC_EVENT_WRITE_COMPLETE: @@ -1262,13 +1275,20 @@ bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c) // the SM since the reenabling has the side effect // updating the buffer state for the VConnection // that is being reenabled - if (c->producer->alive && c->producer->read_vio + if (p->alive && p->read_vio #ifndef LAZY_BUF_ALLOC - && c->producer->read_buffer->write_avail() > 0 + && p->read_buffer->write_avail() > 0 #endif ) { - c->producer->read_vio->reenable(); + if (p->is_throttled()) + this->consumer_reenable(c); + else + p->read_vio->reenable(); } + // [amc] I don't think this happens but we'll leave a debug trap + // here just in case. + if (p->is_throttled()) + Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p); break; case VC_EVENT_READ_READY: http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpTunnel.h ---------------------------------------------------------------------- diff --git a/proxy/http/HttpTunnel.h b/proxy/http/HttpTunnel.h index f9c4637..2de633c 100644 --- a/proxy/http/HttpTunnel.h +++ b/proxy/http/HttpTunnel.h @@ -175,6 +175,16 @@ struct HttpTunnelConsumer bool alive; bool write_success; const char *name; + + /** Check if this consumer is downstream from @a vc. + @return @c true if any producer in the tunnel eventually feeds + data to this consumer. + */ + bool is_downstream_from(VConnection* vc); + /** Check if this is a sink (final data destination). + @return @c true if data exits the ATS process at this consumer. + */ + bool is_sink() const; }; struct HttpTunnelProducer @@ -202,12 +212,44 @@ struct HttpTunnelProducer int64_t ntodo; // what this vc needs to do int64_t bytes_read; // total bytes read from the vc int handler_state; // state used the handlers + int last_event; ///< Tracking for flow control restarts. int num_consumers; bool alive; bool read_success; + /// Flag and pointer for active flow control throttling. + /// If this is set, it points at the source producer that is under flow control. + /// If @c NULL then data flow is not being throttled. + HttpTunnelProducer* flow_control_source; const char *name; + + /** Get the largest number of bytes any consumer has not consumed. + Use @a limit if you only need to check if the backlog is at least @a limit. + @return The actual backlog or a number at least @a limit. + */ + uint64_t backlog( + uint64_t limit = INTU64_MAX ///< More than this is irrelevant + ); + /// Check if producer is original (to ATS) source of data. + /// @return @c true if this producer is the source of bytes from outside ATS. + bool is_source() const; + /// Throttle the flow. + void throttle(); + /// Unthrottle the flow. + void unthrottle(); + /// Check throttled state. + bool is_throttled() const; + + /** Set the flow control source producer for the flow. + This sets the value for this producer and all downstream producers. + @note This is the implementation for @c throttle and @c unthrottle. + @see throttle + @see unthrottle + */ + void set_throttle_src( + HttpTunnelProducer* srcp ///< Source producer of flow. + ); }; class PostDataBuffers @@ -229,6 +271,26 @@ class HttpTunnel:public Continuation { friend class HttpPagesHandler; friend class CoreUtils; + + /** Data for implementing flow control across a tunnel. + + The goal is to bound the amount of data buffered for a + transaction flowing through the tunnel to (roughly) between the + @a high_water and @a low_water water marks. Due to the chunky nater of data + flow this always approximate. + */ + struct FlowControl { + // Default value for high and low water marks. + static uint64_t const DEFAULT_WATER_MARK = 1<<16; + + uint64_t high_water; ///< Buffered data limit - throttle if more than this. + uint64_t low_water; ///< Unthrottle if less than this buffered. + bool enabled_p; ///< Flow control state (@c false means disabled). + + /// Default constructor. + FlowControl(); + }; + public: HttpTunnel(); @@ -237,7 +299,7 @@ public: void kill_tunnel(); bool is_tunnel_active() { return active; } bool is_tunnel_alive(); - bool is_there_cache_write(); + bool has_cache_writer(); // YTS Team, yamsat Plugin void copy_partial_post_data(); @@ -266,6 +328,7 @@ public: void tunnel_run(HttpTunnelProducer * p = NULL); int main_handler(int event, void *data); + bool consumer_reenable(HttpTunnelConsumer* c); bool consumer_handler(int event, HttpTunnelConsumer * c); bool producer_handler(int event, HttpTunnelProducer * p); int producer_handler_dechunked(int event, HttpTunnelProducer * p); @@ -277,6 +340,18 @@ public: void abort_cache_write_finish_others(HttpTunnelProducer * p); void append_message_to_producer_buffer(HttpTunnelProducer * p, const char *msg, int64_t msg_len); + /** Mark a producer and consumer as the same underlying object. + + This is use to chain producer/consumer pairs together to + indicate the data flows through them sequentially. The primary + example is a transform which serves as a consumer on the server + side and a producer on the cache/client side. + */ + void chain( + HttpTunnelConsumer* c, ///< Flow goes in here + HttpTunnelProducer* p ///< Flow comes back out here + ); + void close_vc(HttpTunnelProducer * p); void close_vc(HttpTunnelConsumer * c); @@ -301,6 +376,9 @@ private: bool active; + /// State data about flow control. + FlowControl flow_state; + public: PostDataBuffers * postbuf; }; @@ -364,15 +442,6 @@ HttpTunnel::is_tunnel_alive() return tunnel_alive; } -inline void -HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex) -{ - sm = sm_arg; - active = false; - mutex = amutex; - SET_HANDLER(&HttpTunnel::main_handler); -} - inline HttpTunnelProducer * HttpTunnel::get_producer(VConnection * vc) { @@ -429,7 +498,7 @@ HttpTunnel::append_message_to_producer_buffer(HttpTunnelProducer * p, const char } inline bool -HttpTunnel::is_there_cache_write() +HttpTunnel::has_cache_writer() { for (int i = 0; i < MAX_CONSUMERS; i++) { if (consumers[i].vc_type == HT_CACHE_WRITE && consumers[i].vc != NULL) { @@ -438,4 +507,63 @@ HttpTunnel::is_there_cache_write() } return false; } + +inline bool +HttpTunnelConsumer::is_downstream_from(VConnection *vc) +{ + HttpTunnelProducer* p = producer; + HttpTunnelConsumer* c; + while (p) { + if (p->vc == vc) return true; + // The producer / consumer chain can contain a cycle in the case + // of a blind tunnel so give up if we find ourself (the original + // consumer). + c = p->self_consumer; + p = (c && c != this) ? c->producer : 0; + } + return false; +} + +inline bool +HttpTunnelConsumer::is_sink() const +{ + return HT_HTTP_CLIENT == vc_type || HT_CACHE_WRITE == vc_type; +} + +inline bool +HttpTunnelProducer::is_source() const +{ + // If a producer is marked as a client, then it's part of a bidirectional tunnel + // and so is an actual source of data. + return HT_HTTP_SERVER == vc_type || HT_CACHE_READ == vc_type || HT_HTTP_CLIENT == vc_type; +} + +inline bool +HttpTunnelProducer::is_throttled() const +{ + return 0 != flow_control_source; +} + +inline void +HttpTunnelProducer::throttle() +{ + if (!this->is_throttled()) + this->set_throttle_src(this); +} + +inline void +HttpTunnelProducer::unthrottle() +{ + if (this->is_throttled()) + this->set_throttle_src(0); +} + +inline +HttpTunnel::FlowControl::FlowControl() + : high_water(DEFAULT_WATER_MARK) + , low_water(DEFAULT_WATER_MARK) + , enabled_p(false) +{ +} + #endif
