Updated Branches:
  refs/heads/master e9cd43b8a -> e768cb61c

TS-1496: Enable per transaction flow control


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e768cb61
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e768cb61
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e768cb61

Branch: refs/heads/master
Commit: e768cb61c335d8edafdc2cf17b4553cb490e49e4
Parents: e9cd43b
Author: Alan M. Carroll <[email protected]>
Authored: Fri Feb 22 19:54:11 2013 -0600
Committer: Alan M. Carroll <[email protected]>
Committed: Wed Jun 12 13:55:49 2013 -0500

----------------------------------------------------------------------
 CHANGES                         |   2 +
 iocore/eventsystem/I_IOBuffer.h |   7 +
 iocore/eventsystem/P_IOBuffer.h |  15 ++
 mgmt/RecordsConfig.cc           |   6 +
 proxy/InkAPI.cc                 |  18 ++
 proxy/InkAPITest.cc             |   4 +
 proxy/Transform.cc              |  27 ++-
 proxy/Transform.h               |  28 +++
 proxy/TransformInternal.h       |   7 +-
 proxy/api/ts/ts.h.in            |  13 +-
 proxy/http/HttpConfig.cc        |  19 ++
 proxy/http/HttpConfig.h         |   5 +-
 proxy/http/HttpDebugNames.cc    |   2 +
 proxy/http/HttpSM.cc            | 102 ++++++++--
 proxy/http/HttpSM.h             |   6 +
 proxy/http/HttpTunnel.cc        | 376 ++++++++++++++++++-----------------
 proxy/http/HttpTunnel.h         | 150 +++++++++++++-
 17 files changed, 572 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 065ab60..5f00fba 100644
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,8 @@
 
   *) [TS-1942] Remove username.cache configs, they are obsolete and long gone.
 
+  *) [TS-1496] Enable per transaction flow control.
+
 
   Changes with Apache Traffic Server 3.3.4
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/iocore/eventsystem/I_IOBuffer.h
----------------------------------------------------------------------
diff --git a/iocore/eventsystem/I_IOBuffer.h b/iocore/eventsystem/I_IOBuffer.h
index 5b44e17..baa1c54 100644
--- a/iocore/eventsystem/I_IOBuffer.h
+++ b/iocore/eventsystem/I_IOBuffer.h
@@ -578,6 +578,12 @@ public:
   */
   int64_t read_avail();
 
+  /** Check if there is more than @a size bytes available to read.
+      @return @c true if more than @a size byte are available.
+  */
+  bool is_read_avail_more_than(int64_t size);
+
+
   /**
     Number of IOBufferBlocks with data in the block list. Returns the
     number of IOBufferBlocks on the block list with data remaining for
@@ -1071,6 +1077,7 @@ public:
     return !_writer;
   }
   int64_t max_read_avail();
+
   int max_block_count();
   void check_add_block();
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/iocore/eventsystem/P_IOBuffer.h
----------------------------------------------------------------------
diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
index 98bdda2..261aa1f 100644
--- a/iocore/eventsystem/P_IOBuffer.h
+++ b/iocore/eventsystem/P_IOBuffer.h
@@ -633,6 +633,21 @@ IOBufferReader::read_avail()
   return t;
 }
 
+inline bool
+IOBufferReader::is_read_avail_more_than(int64_t size)
+{
+  int64_t t = -start_offset;
+  IOBufferBlock* b = block;
+  while (b) {
+    t += b->read_avail();
+    if (t > size) {
+      return true;
+    }
+    b = b->next;
+  }
+  return false;
+}
+
 TS_INLINE void
 IOBufferReader::consume(int64_t n)
 {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/mgmt/RecordsConfig.cc
----------------------------------------------------------------------
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index 0ab0b94..fbc101a 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -429,6 +429,12 @@ RecordElement RecordsConfig[] = {
   ,
   {RECT_CONFIG, "proxy.config.http.chunking.size", RECD_INT, "4096", 
RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
+  {RECT_CONFIG, "proxy.config.http.flow_control.enabled", RECD_INT, "0", 
RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  ,
+  {RECT_CONFIG, "proxy.config.http.flow_control.high_water", RECD_INT, "0", 
RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  ,
+  {RECT_CONFIG, "proxy.config.http.flow_control.low_water", RECD_INT, "0", 
RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  ,
   {RECT_CONFIG, "proxy.config.http.session_auth_cache_keep_alive_enabled", 
RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
   //       # Send http11 requests

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/InkAPI.cc
----------------------------------------------------------------------
diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
index 8004a1e..f598b47 100644
--- a/proxy/InkAPI.cc
+++ b/proxy/InkAPI.cc
@@ -7583,6 +7583,17 @@ _conf_to_memberp(TSOverridableConfigKey conf, HttpSM* 
sm, OverridableDataType *t
     typ = OVERRIDABLE_TYPE_INT;
     ret = &sm->t_state.txn_conf->http_chunking_size;
     break;
+  case TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED:
+    ret = &sm->t_state.txn_conf->flow_control_enabled;
+    break;
+  case TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK:
+    typ = OVERRIDABLE_TYPE_INT;
+    ret = &sm->t_state.txn_conf->flow_high_water_mark;
+    break;
+  case TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK:
+    typ = OVERRIDABLE_TYPE_INT;
+    ret = &sm->t_state.txn_conf->flow_low_water_mark;
+    break;
 
     // This helps avoiding compiler warnings, yet detect unhandled enum 
members.
   case TS_CONFIG_NULL:
@@ -7816,6 +7827,9 @@ TSHttpTxnConfigFind(const char* name, int length, 
TSOverridableConfigKey *conf,
     case 'd':
       if (!strncmp(name, "proxy.config.http.server_tcp_init_cwnd", length))
         cnf = TS_CONFIG_HTTP_SERVER_TCP_INIT_CWND;
+      else if (!strncmp(name, "proxy.config.http.flow_control.enabled", 
length))
+        cnf = TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED;
+      break;
       break;
     case 's':
       if (!strncmp(name, "proxy.config.http.send_http11_requests", length))
@@ -7856,6 +7870,8 @@ TSHttpTxnConfigFind(const char* name, int length, 
TSOverridableConfigKey *conf,
         cnf = TS_CONFIG_URL_REMAP_PRISTINE_HOST_HDR;
       else if (!strncmp(name, "proxy.config.http.insert_request_via_str", 
length))
         cnf = TS_CONFIG_HTTP_INSERT_REQUEST_VIA_STR;
+      else if (!strncmp(name, "proxy.config.http.flow_control.low_water", 
length))
+        cnf = TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK;
       break;
     case 's':
       if (!strncmp(name, "proxy.config.http.origin_max_connections", length))
@@ -7887,6 +7903,8 @@ TSHttpTxnConfigFind(const char* name, int length, 
TSOverridableConfigKey *conf,
     case 'r':
       if (!strncmp(name, "proxy.config.http.insert_response_via_str", length))
         cnf = TS_CONFIG_HTTP_INSERT_RESPONSE_VIA_STR;
+      else if (!strncmp(name, "proxy.config.http.flow_control.high_water", 
length))
+        cnf = TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK;
       break;
     }
     break;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/InkAPITest.cc
----------------------------------------------------------------------
diff --git a/proxy/InkAPITest.cc b/proxy/InkAPITest.cc
index 011e2d3..a1cdf8e 100644
--- a/proxy/InkAPITest.cc
+++ b/proxy/InkAPITest.cc
@@ -5599,6 +5599,7 @@ typedef enum
   ORIG_TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK,
   ORIG_TS_HTTP_PRE_REMAP_HOOK,
   ORIG_TS_HTTP_POST_REMAP_HOOK,
+  ORIG_TS_HTTP_RESPONSE_CLIENT_HOOK,
   ORIG_TS_HTTP_LAST_HOOK
 } ORIG_TSHttpHookID;
 
@@ -7451,6 +7452,9 @@ const char *SDK_Overridable_Configs[TS_CONFIG_LAST_ENTRY] 
= {
   "proxy.config.net.sock_packet_tos_out",
   "proxy.config.http.insert_age_in_response",
   "proxy.config.http.chunking.size",
+  "proxy.config.http.flow_control.enabled",
+  "proxy.config.http.flow_control.low_water",
+  "proxy.config.http.flow_control.high_water"
 };
 
 REGRESSION_TEST(SDK_API_OVERRIDABLE_CONFIGS) (RegressionTest * test, int 
atype, int *pstatus)

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/Transform.cc
----------------------------------------------------------------------
diff --git a/proxy/Transform.cc b/proxy/Transform.cc
index 80f3dff..da3e51c 100644
--- a/proxy/Transform.cc
+++ b/proxy/Transform.cc
@@ -398,7 +398,7 @@ TransformTerminus::reenable(VIO *vio)
   -------------------------------------------------------------------------*/
 
 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
-:VConnection(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
+:TransformVCChain(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
 {
   INKVConnInternal *xform;
 
@@ -506,6 +506,31 @@ TransformVConnection::reenable(VIO *vio)
   ink_assert(!"not reached");
 }
 
+/*-------------------------------------------------------------------------
+  -------------------------------------------------------------------------*/
+
+uint64_t
+TransformVConnection::backlog(uint64_t limit)
+{
+  uint64_t b = 0; // backlog
+  VConnection* raw_vc = m_transform;
+  MIOBuffer* w;
+  while (raw_vc && raw_vc != &m_terminus) {
+    INKVConnInternal* vc = static_cast<INKVConnInternal*>(raw_vc);
+    if (0 != (w = vc->m_read_vio.buffer.writer()))
+      b += w->max_read_avail();
+    if (b >= limit) return b;
+    raw_vc = vc->m_output_vc;
+  }
+  if (0 != (w = m_terminus.m_read_vio.buffer.writer()))
+    b += w->max_read_avail();
+  if (b >= limit) return b;
+
+  IOBufferReader* r = m_terminus.m_write_vio.get_reader();
+  if (r)
+    b += r->read_avail();
+  return b;
+}
 
 /*-------------------------------------------------------------------------
   -------------------------------------------------------------------------*/

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/Transform.h
----------------------------------------------------------------------
diff --git a/proxy/Transform.h b/proxy/Transform.h
index bbe6bab..26ac2be 100644
--- a/proxy/Transform.h
+++ b/proxy/Transform.h
@@ -60,6 +60,34 @@ public:
 };
 #endif
 
+/** A protocol class.
+    This provides transform VC specific methods for external access
+    without exposing internals or requiring extra includes.
+*/
+class TransformVCChain : public VConnection
+{
+ protected:
+  /// Required constructor
+  TransformVCChain(ProxyMutex* m);
+ public:
+  /** Compute the backlog.  This is the amount of data ready to read
+      for each element of the chain.  If @a limit is non-negative then
+      the method will return as soon as the computed backlog is at
+      least that large. This provides for more efficient checking if
+      the caller is interested only in whether the backlog is at least
+      @a limit. The default is to accurately compute the backlog.
+  */
+  virtual uint64_t backlog(
+                          uint64_t limit = INTU64_MAX ///< Maximum value of 
interest
+                         ) = 0;
+};
+
+inline
+TransformVCChain::TransformVCChain(ProxyMutex* m)
+                : VConnection(m)
+{
+}
+
 ///////////////////////////////////////////////////////////////////
 /// RangeTransform implementation
 /// handling Range requests from clients

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/TransformInternal.h
----------------------------------------------------------------------
diff --git a/proxy/TransformInternal.h b/proxy/TransformInternal.h
index 8b03529..873561e 100644
--- a/proxy/TransformInternal.h
+++ b/proxy/TransformInternal.h
@@ -59,7 +59,7 @@ public:
 };
 
 
-class TransformVConnection:public VConnection
+class TransformVConnection:public TransformVCChain
 {
 public:
   TransformVConnection(Continuation * cont, APIHook * hooks);
@@ -74,6 +74,11 @@ public:
 
   void reenable(VIO * vio);
 
+  /** Compute the backlog.
+      @return The actual backlog, or a value at least @a limit.
+  */
+  virtual uint64_t backlog(uint64_t limit = INTU64_MAX);
+
 public:
   VConnection * m_transform;
   Continuation *m_cont;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/api/ts/ts.h.in
----------------------------------------------------------------------
diff --git a/proxy/api/ts/ts.h.in b/proxy/api/ts/ts.h.in
index 6741d79..c533068 100644
--- a/proxy/api/ts/ts.h.in
+++ b/proxy/api/ts/ts.h.in
@@ -210,8 +210,10 @@ extern "C"
       continuation for a particular hook are:
 
       TSHttpHookAdd: adds a global hook. You can globally add
-      any hook except for TS_HTTP_REQUEST_TRANSFORM_HOOK and
-      TS_HTTP_RESPONSE_TRANSFORM_HOOK.
+      any hook except for
+       - TS_HTTP_REQUEST_TRANSFORM_HOOK
+       - TS_HTTP_RESPONSE_TRANSFORM_HOOK
+       - TS_HTTP_RESPONSE_CLIENT_HOOK
 
       The following hooks can ONLY be added globally:
        - TS_HTTP_SELECT_ALT_HOOK
@@ -228,6 +230,7 @@ extern "C"
        - TS_HTTP_SEND_RESPONSE_HDR_HOOK
        - TS_HTTP_REQUEST_TRANSFORM_HOOK
        - TS_HTTP_RESPONSE_TRANSFORM_HOOK
+       - TS_HTTP_RESPONSE_CLIENT_HOOK
        - TS_HTTP_TXN_START_HOOK
        - TS_HTTP_TXN_CLOSE_HOOK
 
@@ -268,6 +271,7 @@ extern "C"
     TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK,
     TS_HTTP_PRE_REMAP_HOOK,
     TS_HTTP_POST_REMAP_HOOK,
+    TS_HTTP_RESPONSE_CLIENT_HOOK,
     TS_HTTP_LAST_HOOK
   } TSHttpHookID;
   #define TS_HTTP_READ_REQUEST_PRE_REMAP_HOOK TS_HTTP_PRE_REMAP_HOOK  /* 
backwards compat */
@@ -612,7 +616,10 @@ extern "C"
     TS_CONFIG_NET_SOCK_PACKET_TOS_OUT,
     TS_CONFIG_HTTP_INSERT_AGE_IN_RESPONSE,
     TS_CONFIG_HTTP_CHUNKING_SIZE,
-    TS_CONFIG_LAST_ENTRY
+    TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED,
+    TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK,
+    TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK,
+    TS_CONFIG_LAST_ENTRY,
   } TSOverridableConfigKey;
 
   /* The TASK pool of threads is the primary method of off-loading 
continuations from the

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpConfig.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpConfig.cc b/proxy/http/HttpConfig.cc
index 21f6fd0..a1f7361 100644
--- a/proxy/http/HttpConfig.cc
+++ b/proxy/http/HttpConfig.cc
@@ -1181,6 +1181,9 @@ HttpConfig::startup()
   HttpEstablishStaticConfigByte(c.oride.keep_alive_enabled_out, 
"proxy.config.http.keep_alive_enabled_out");
   HttpEstablishStaticConfigByte(c.oride.chunking_enabled, 
"proxy.config.http.chunking_enabled");
   HttpEstablishStaticConfigLongLong(c.oride.http_chunking_size, 
"proxy.config.http.chunking.size");
+  HttpEstablishStaticConfigByte(c.oride.flow_control_enabled, 
"proxy.config.http.flow_control.enabled");
+  HttpEstablishStaticConfigLongLong(c.oride.flow_high_water_mark, 
"proxy.config.http.flow_control.high_water");
+  HttpEstablishStaticConfigLongLong(c.oride.flow_low_water_mark, 
"proxy.config.http.flow_control.low_water");
   HttpEstablishStaticConfigByte(c.session_auth_cache_keep_alive_enabled,
                                 
"proxy.config.http.session_auth_cache_keep_alive_enabled");
   HttpEstablishStaticConfigLongLong(c.origin_server_pipeline, 
"proxy.config.http.origin_server_pipeline");
@@ -1456,6 +1459,22 @@ HttpConfig::reconfigure()
   params->oride.keep_alive_enabled_out = 
INT_TO_BOOL(m_master.oride.keep_alive_enabled_out);
   params->oride.chunking_enabled = 
INT_TO_BOOL(m_master.oride.chunking_enabled);
   params->oride.http_chunking_size = m_master.oride.http_chunking_size;
+
+  params->oride.flow_control_enabled = 
INT_TO_BOOL(m_master.oride.flow_control_enabled);
+  params->oride.flow_high_water_mark = m_master.oride.flow_high_water_mark;
+  params->oride.flow_low_water_mark = m_master.oride.flow_low_water_mark;
+  // If not set (zero) then make values the same.
+  if (params->oride.flow_low_water_mark <= 0)
+    params->oride.flow_low_water_mark = params->oride.flow_high_water_mark;
+  if (params->oride.flow_high_water_mark <= 0)
+    params->oride.flow_high_water_mark = params->oride.flow_low_water_mark;
+  if (params->oride.flow_high_water_mark < params->oride.flow_low_water_mark) {
+    Warning("Flow control low water mark is greater than high water mark, flow 
control disabled");
+    params->oride.flow_control_enabled = 0;
+    // zero means "hardwired default" when actually used.
+    params->oride.flow_high_water_mark = params->oride.flow_low_water_mark = 0;
+  }
+
   params->session_auth_cache_keep_alive_enabled = 
INT_TO_BOOL(m_master.session_auth_cache_keep_alive_enabled);
   params->origin_server_pipeline = m_master.origin_server_pipeline;
   params->user_agent_pipeline = m_master.user_agent_pipeline;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpConfig.h
----------------------------------------------------------------------
diff --git a/proxy/http/HttpConfig.h b/proxy/http/HttpConfig.h
index 024154d..22e2c92 100644
--- a/proxy/http/HttpConfig.h
+++ b/proxy/http/HttpConfig.h
@@ -431,7 +431,7 @@ struct OverridableHttpConfigParams {
        freshness_fuzz_time(0), freshness_fuzz_min_time(0),
        max_cache_open_read_retries(0), cache_open_read_retry_time(0),
        background_fill_active_timeout(0),
-       http_chunking_size(0),
+       http_chunking_size(0), flow_high_water_mark(0), flow_low_water_mark(0),
 
        // Strings / floats must come last
        proxy_response_server_string(NULL), proxy_response_server_string_len(0),
@@ -506,6 +506,7 @@ struct OverridableHttpConfigParams {
   //  DOC IN CACHE NO DNS//
   //////////////////////
   MgmtByte doc_in_cache_skip_dns;
+  MgmtByte flow_control_enabled;
 
   MgmtInt negative_caching_lifetime;
 
@@ -567,6 +568,8 @@ struct OverridableHttpConfigParams {
   MgmtInt background_fill_active_timeout;
 
   MgmtInt http_chunking_size; // Maximum chunk size for chunked output.
+  MgmtInt flow_high_water_mark; ///< Flow control high water mark.
+  MgmtInt flow_low_water_mark; ///< Flow control low water mark.
 
   // IMPORTANT: Here comes all strings / floats configs.
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpDebugNames.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpDebugNames.cc b/proxy/http/HttpDebugNames.cc
index 3703283..c296dde 100644
--- a/proxy/http/HttpDebugNames.cc
+++ b/proxy/http/HttpDebugNames.cc
@@ -486,6 +486,8 @@ HttpDebugNames::get_api_hook_name(TSHttpHookID t)
     return "TS_HTTP_PRE_REMAP_HOOK";
   case TS_HTTP_POST_REMAP_HOOK:
     return "TS_HTTP_POST_REMAP_HOOK";
+  case TS_HTTP_RESPONSE_CLIENT_HOOK:
+    return "TS_HTTP_RESPONSE_CLIENT_HOOK";
   case TS_HTTP_LAST_HOOK:
     return "TS_HTTP_LAST_HOOK";
   }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpSM.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc
index 38a489f..8035b38 100644
--- a/proxy/http/HttpSM.cc
+++ b/proxy/http/HttpSM.cc
@@ -319,7 +319,8 @@ HttpSM::HttpSM()
     ua_raw_buffer_reader(NULL),
     server_entry(NULL), server_session(NULL), shared_session_retries(0),
     server_buffer_reader(NULL),
-    transform_info(), post_transform_info(), second_cache_sm(NULL),
+    transform_info(), post_transform_info(), has_active_plugin_agents(false),
+    second_cache_sm(NULL),
     default_handler(NULL), pending_action(NULL), historical_action(NULL),
     last_action(HttpTransact::STATE_MACHINE_ACTION_UNDEFINED),
     client_request_hdr_bytes(0), client_request_body_bytes(0),
@@ -2966,9 +2967,10 @@ HttpSM::is_bg_fill_necessary(HttpTunnelConsumer * c)
 
   // There must be another consumer for it to worthwhile to
   //  set up a background fill
-  if (c->producer->num_consumers > 1 &&
-      (c->producer->vc_type == HT_HTTP_SERVER  || c->producer->vc_type == 
HT_TRANSFORM) &&
+  if (((c->producer->num_consumers > 1 && c->producer->vc_type == 
HT_HTTP_SERVER) ||
+       (c->producer->num_consumers > 1 && c->producer->vc_type == 
HT_TRANSFORM)) &&
       c->producer->alive == true) {
+
     // If threshold is 0.0 or negative then do background
     //   fill regardless of the content length.  Since this
     //   is floating point just make sure the number is near zero
@@ -2988,7 +2990,7 @@ HttpSM::is_bg_fill_necessary(HttpTunnelConsumer * c)
       if (pDone <= 1.0 && pDone > t_state.txn_conf->background_fill_threshold) 
{
         return true;
       } else {
-        DebugSM("http", "[%" PRId64 "] no background.  Only %%%f done", sm_id, 
pDone);
+        DebugSM("http", "[%" PRId64 "] no background.  Only %%%f of %%%f done 
[%" PRId64 " / %" PRId64" ]", sm_id, pDone, 
t_state.txn_conf->background_fill_threshold, ua_body_done, ua_cl);
       }
 
     }
@@ -3027,8 +3029,8 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer * 
c)
 
       // There is another consumer (cache write) so
       //  detach the user agent
-      ink_assert(server_entry->vc == c->producer->vc);
-      ink_assert(server_session == c->producer->vc);
+      ink_assert(server_entry->vc == server_session);
+      ink_assert(c->is_downstream_from(server_session));
       server_session->get_netvc()->
         
set_active_timeout(HRTIME_SECONDS(t_state.txn_conf->background_fill_active_timeout));
     } else {
@@ -3690,6 +3692,35 @@ HttpSM::tunnel_handler_transform_read(int event, 
HttpTunnelProducer * p)
 }
 
 int
+HttpSM::tunnel_handler_plugin_agent(int event, HttpTunnelConsumer * c)
+{
+  STATE_ENTER(&HttpSM::tunnel_handler_plugin_client, event);
+
+  switch (event) {
+  case VC_EVENT_ERROR:
+    c->vc->do_io_close(EHTTP_ERROR); // close up
+    // Signal producer if we're the last consumer.
+    if (c->producer->alive && c->producer->num_consumers == 1) {
+      tunnel.producer_handler(HTTP_TUNNEL_EVENT_CONSUMER_DETACH, c->producer);
+    }
+    break;
+  case VC_EVENT_EOS:
+    if (c->producer->alive && c->producer->num_consumers == 1) {
+      tunnel.producer_handler(HTTP_TUNNEL_EVENT_CONSUMER_DETACH, c->producer);
+    }
+    // FALLTHROUGH
+  case VC_EVENT_WRITE_COMPLETE:
+    c->write_success = true;
+    c->vc->do_io(VIO::CLOSE);
+    break;
+  default:
+    ink_release_assert(0);
+  }
+
+  return 0;
+}
+
+int
 HttpSM::state_srv_lookup(int event, void *data)
 {
   STATE_ENTER(&HttpSM::state_srv_lookup, event);
@@ -4285,7 +4316,7 @@ HttpSM::do_cache_prepare_write()
 inline void
 HttpSM::do_cache_prepare_write_transform()
 {
-  if (cache_sm.cache_write_vc != NULL || tunnel.is_there_cache_write())
+  if (cache_sm.cache_write_vc != NULL || tunnel.has_cache_writer())
     do_cache_prepare_action(&transform_cache_sm, NULL, false, true);
   else
     do_cache_prepare_action(&transform_cache_sm, NULL, false);
@@ -4370,7 +4401,6 @@ HttpSM::do_cache_prepare_action(HttpCacheSM * c_sm, 
CacheHTTPInfo * object_read_
   }
 }
 
-
 //////////////////////////////////////////////////////////////////////////
 //
 //  HttpSM::do_http_server_open()
@@ -4566,7 +4596,6 @@ HttpSM::do_http_server_open(bool raw)
     } else if (ua_session->f_outbound_transparent) {
       opt.addr_binding = NetVCOptions::FOREIGN_ADDR;
       opt.local_ip = t_state.client_info.addr;
-
       /* If the connection is server side transparent, we can bind to the
          port that the client chose instead of randomly assigning one at
          the proxy.  This is controlled by the 'use_client_source_port'
@@ -5116,8 +5145,7 @@ HttpSM::setup_transform_to_server_transfer()
                                               
&HttpSM::tunnel_handler_transform_read,
                                               HT_TRANSFORM,
                                               "post transform");
-  p->self_consumer = c;
-  c->self_producer = p;
+  tunnel.chain(c,p);
   post_transform_info.entry->in_tunnel = true;
 
   tunnel.add_consumer(server_entry->vc,
@@ -6000,14 +6028,15 @@ HttpSM::setup_transfer_from_transform()
                                               
&HttpSM::tunnel_handler_transform_read,
                                               HT_TRANSFORM,
                                               "transform read");
-  p->self_consumer = c;
-  c->self_producer = p;
+  tunnel.chain(c, p);
 
   tunnel.add_consumer(ua_entry->vc, transform_info.vc, 
&HttpSM::tunnel_handler_ua, HT_HTTP_CLIENT, "user agent");
 
   transform_info.entry->in_tunnel = true;
   ua_entry->in_tunnel = true;
 
+  this->setup_plugin_agents(p);
+
   if ( t_state.client_info.receive_chunked_response ) {
     tunnel.set_producer_chunking_action(p, client_response_hdr_bytes, 
TCA_CHUNK_CONTENT);
     tunnel.set_producer_chunking_size(p, t_state.txn_conf->http_chunking_size);
@@ -6037,8 +6066,7 @@ HttpSM::setup_transfer_from_transform_to_cache_only()
                                               
&HttpSM::tunnel_handler_transform_read,
                                               HT_TRANSFORM,
                                               "transform read");
-  p->self_consumer = c;
-  c->self_producer = p;
+  tunnel.chain(c, p);
 
   transform_info.entry->in_tunnel = true;
 
@@ -6108,7 +6136,10 @@ HttpSM::setup_server_transfer()
       action = TCA_PASSTHRU_DECHUNKED_CONTENT;
   } else {
     if (t_state.current.server->transfer_encoding != 
HttpTransact::CHUNKED_ENCODING)
-      action = TCA_CHUNK_CONTENT;
+      if (t_state.client_info.http_version == HTTPVersion(0, 9))
+        action = TCA_PASSTHRU_DECHUNKED_CONTENT; // send as-is
+      else
+        action = TCA_CHUNK_CONTENT;
     else
       action = TCA_PASSTHRU_CHUNKED_CONTENT;
   }
@@ -6141,6 +6172,8 @@ HttpSM::setup_server_transfer()
   ua_entry->in_tunnel = true;
   server_entry->in_tunnel = true;
 
+  this->setup_plugin_agents(p);
+
   // If the incoming server response is chunked and the client does not
   // expect a chunked response, then dechunk it.  Otherwise, if the
   // incoming response is not chunked and the client expects a chunked
@@ -6257,10 +6290,8 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr)
                              &HttpSM::tunnel_handler_ssl_consumer, 
HT_HTTP_SERVER, "http server - tunnel");
 
   // Make the tunnel aware that the entries are bi-directional
-  p_os->self_consumer = c_os;
-  p_ua->self_consumer = c_ua;
-  c_ua->self_producer = p_ua;
-  c_os->self_producer = p_os;
+  tunnel.chain(c_os, p_os);
+  tunnel.chain(c_ua, p_ua);
 
   ua_entry->in_tunnel = true;
   server_entry->in_tunnel = true;
@@ -6268,6 +6299,20 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr)
   tunnel.tunnel_run();
 }
 
+void
+HttpSM::setup_plugin_agents(HttpTunnelProducer* p)
+{
+  APIHook* agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
+  has_active_plugin_agents = agent != 0;
+  while (agent) {
+    INKVConnInternal* contp = static_cast<INKVConnInternal*>(agent->m_cont);
+    tunnel.add_consumer(contp, p->vc, &HttpSM::tunnel_handler_plugin_agent, 
HT_HTTP_CLIENT, "plugin agent");
+    // We don't put these in the SM VC table because the tunnel
+    // will clean them up in do_io_close().
+    agent = agent->next();
+  }
+}
+
 inline void
 HttpSM::transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info)
 {
@@ -6281,6 +6326,22 @@ HttpSM::transform_cleanup(TSHttpHookID hook, 
HttpTransformInfo * info)
   }
 }
 
+void
+HttpSM::plugin_agents_cleanup()
+{
+  // If this is set then all of the plugin agent VCs were put in
+  // the VC table and cleaned up there. This handles the case where
+  // something went wrong early.
+  if (!has_active_plugin_agents) {
+    APIHook* agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
+    while (agent) {
+      INKVConnInternal* contp = static_cast<INKVConnInternal*>(agent->m_cont);
+      contp->do_io_close();
+      agent = agent->next();
+    }
+  }
+}
+
 //////////////////////////////////////////////////////////////////////////
 //
 //  HttpSM::kill_this()
@@ -6326,6 +6387,7 @@ HttpSM::kill_this()
     if (hooks_set) {
       transform_cleanup(TS_HTTP_RESPONSE_TRANSFORM_HOOK, &transform_info);
       transform_cleanup(TS_HTTP_REQUEST_TRANSFORM_HOOK, &post_transform_info);
+      plugin_agents_cleanup();
     }
     // It's also possible that the plugin_tunnel vc was never
     //   executed due to not contacting the server

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpSM.h
----------------------------------------------------------------------
diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h
index 81e1e07..d05fb4d 100644
--- a/proxy/http/HttpSM.h
+++ b/proxy/http/HttpSM.h
@@ -322,6 +322,9 @@ protected:
 
   HttpTransformInfo transform_info;
   HttpTransformInfo post_transform_info;
+  /// Set if plugin client / user agents are active.
+  /// Need primarily for cleanup.
+  bool has_active_plugin_agents;
 
   HttpCacheSM cache_sm;
   HttpCacheSM transform_cache_sm;
@@ -397,6 +400,7 @@ protected:
   int tunnel_handler_ssl_consumer(int event, HttpTunnelConsumer * p);
   int tunnel_handler_transform_write(int event, HttpTunnelConsumer * c);
   int tunnel_handler_transform_read(int event, HttpTunnelProducer * p);
+  int tunnel_handler_plugin_agent(int event, HttpTunnelConsumer * c);
 
   void do_hostdb_lookup();
   void do_hostdb_reverse_lookup();
@@ -458,6 +462,7 @@ protected:
   HttpTunnelProducer *setup_transfer_from_transform();
   HttpTunnelProducer *setup_cache_transfer_to_transform();
   HttpTunnelProducer *setup_transfer_from_transform_to_cache_only();
+  void setup_plugin_agents(HttpTunnelProducer* p);
 
   HttpTransact::StateMachineAction_t last_action;
   int (HttpSM::*m_last_state) (int event, void *data);
@@ -517,6 +522,7 @@ protected:
   void update_stats();
   void transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info);
   bool is_transparent_passthrough_allowed();
+  void plugin_agents_cleanup();
 
 public:
   LINK(HttpSM, debug_link);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpTunnel.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpTunnel.cc b/proxy/http/HttpTunnel.cc
index d527a30..8e8ba53 100644
--- a/proxy/http/HttpTunnel.cc
+++ b/proxy/http/HttpTunnel.cc
@@ -37,8 +37,6 @@
 #include "HttpDebugNames.h"
 #include "ParseRules.h"
 
-
-static const int max_chunked_ahead_bytes = 1 << 15;
 static const int max_chunked_ahead_blocks = 128;
 static const int min_block_transfer_bytes = 256;
 static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n";
@@ -47,81 +45,18 @@ static char const * const CHUNK_HEADER_FMT = "%" 
PRIx64"\r\n";
 // a block in the input stream.
 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
 
-static void
-chunked_reenable(HttpTunnelProducer * p, HttpTunnel * tunnel)
-{
-
-  // FIX ME: still need to deal with huge chunk sizes.  If a chunk
-  //    is 1GB, we will currently buffer the whole thing
-
-  if (p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) {
-    p->read_vio->reenable();
-  } else {
-    // If we are in are in the flow control, there's data in
-    //   the incoming buffer that we haven't processed yet
-    //   Only process it if we determine the client isn't overflowed
-    MIOBuffer *dbuf = p->chunked_handler.dechunked_buffer;
-
-    if (dbuf->max_read_avail() < max_chunked_ahead_bytes && 
dbuf->max_block_count() < max_chunked_ahead_blocks) {
-      // Flow control no longer needed.  We only initiate flow control
-      //  after completing a chunk so we know the next state is
-      //  CHUNK_READ_SIZE_START
-      Debug("http_chunk_flow", "Suspending flow control");
-      p->chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
-
-      // Call back the tunnel as if we've received more data from the server
-      int r = tunnel->main_handler(p->chunked_handler.last_server_event, 
p->read_vio);
-
-      // Only actually reenable the server if we've stayed out of the
-      //  flow control state.  The callout may have killed the vc
-      //  and/or the vio so check that the producer is still alive
-      //  (INKqa05512)
-      // Also, make sure the tunnel has not been deallocated on
-      //  the call to tunnel->main_handler
-      if (r == EVENT_CONT && p->alive && p->chunked_handler.state != 
ChunkedHandler::CHUNK_FLOW_CONTROL) {
-        // INKqa05737 - since we explicitly disabled the vc by setting
-        //  nbytes = ndone when going into flow control, we need
-        //  set nbytes up again here
-        p->read_vio->nbytes = INT64_MAX;
-        p->read_vio->reenable();
-      }
-    } else {
-      Debug("http_chunk_flow", "Blocking reenable - flow control in effect");
-    }
-  }
-}
-
-static void
-add_chunked_reenable(HttpTunnelProducer * p, HttpTunnel * tunnel)
-{
-  if (p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) {
-    p->read_vio->reenable();
-  } else {
-    // If we are in are in the flow control, there's data in
-    //   the incoming buffer that we haven't processed yet
-    //   Only process it if we determine the client isn't overflowed
-    MIOBuffer *cbuf = p->chunked_handler.chunked_buffer;
-    if (cbuf->max_read_avail() < max_chunked_ahead_bytes && 
cbuf->max_block_count() < max_chunked_ahead_blocks) {
-      // Flow control no longer needed.
-      Debug("http_chunk_flow", "Suspending flow control on enchunking");
-      p->chunked_handler.state = ChunkedHandler::CHUNK_WRITE_CHUNK;
-
-      // Call back the tunnel as if we've received more data from
-      //   the server
-      int r = tunnel->main_handler(p->chunked_handler.last_server_event, 
p->read_vio);
-
-      // Only actually reenable the server if we've stayed out of the
-      //  flow control state.  The callout may have killed the vc
-      //  and/or the vio so check that the producer is still alive
-      // Also, make sure the tunnel has not been deallocated on
-      //  the call to tunnel->main_handler
-      if (r == EVENT_CONT && p->alive && p->chunked_handler.state != 
ChunkedHandler::CHUNK_FLOW_CONTROL) {
-        p->read_vio->reenable();
-      }
-    } else {
-      Debug("http_chunk_flow", "Blocking reenable on enchunking - flow control 
in effect");
-    }
+char
+VcTypeCode(HttpTunnelType_t t) {
+  char zret = ' ';
+  switch (t) {
+  case HT_HTTP_CLIENT: zret = 'U'; break;
+  case HT_HTTP_SERVER: zret = 'S'; break;
+  case HT_TRANSFORM: zret = 'T'; break;
+  case HT_CACHE_READ: zret = 'R'; break;
+  case HT_CACHE_WRITE: zret = 'W'; break;
+  default: break;
   }
+  return zret;
 }
 
 ChunkedHandler::ChunkedHandler()
@@ -281,15 +216,7 @@ ChunkedHandler::read_chunk()
   if (bytes_left == 0) {
     Debug("http_chunk", "completed read of chunk of %" PRId64" bytes", 
cur_chunk_size);
 
-    // Check to see if we need to flow control the output
-    if (dechunked_buffer &&
-        (dechunked_buffer->max_read_avail() > max_chunked_ahead_bytes ||
-         dechunked_buffer->max_block_count() > max_chunked_ahead_blocks)) {
-      state = CHUNK_FLOW_CONTROL;
-      Debug("http_chunk_flow", "initiating flow control pause");
-    } else {
-      state = CHUNK_READ_SIZE_START;
-    }
+    state = CHUNK_READ_SIZE_START;
   } else if (bytes_left > 0) {
     Debug("http_chunk", "read %" PRId64" bytes of an %" PRId64" chunk", b, 
cur_chunk_size);
   }
@@ -301,7 +228,7 @@ ChunkedHandler::read_trailer()
   int64_t bytes_used;
   bool done = false;
 
-  while (chunked_reader->read_avail() > 0 && !done) {
+  while (chunked_reader->is_read_avail_more_than(0) && !done) {
     const char *tmp = chunked_reader->start();
     int64_t data_size = chunked_reader->block_read_avail();
 
@@ -341,7 +268,7 @@ ChunkedHandler::read_trailer()
 
 bool ChunkedHandler::process_chunked_content()
 {
-  while (chunked_reader->read_avail() > 0 && state != CHUNK_READ_DONE && state 
!= CHUNK_READ_ERROR) {
+  while (chunked_reader->is_read_avail_more_than(0) && state != 
CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
     switch (state) {
     case CHUNK_READ_SIZE:
     case CHUNK_READ_SIZE_CRLF:
@@ -385,48 +312,36 @@ bool ChunkedHandler::generate_chunked_content()
   while ((r_avail = dechunked_reader->read_avail()) > 0 && state != 
CHUNK_WRITE_DONE) {
     int64_t write_val = MIN(max_chunk_size, r_avail);
 
-    // If the server is still alive, check to see if too much data is
-    //    pilling up on the client's buffer.  If the server is done, ignore
-    //    the flow control rules so that we don't have to bother with stopping
-    //    the io an coming a back and dealing with the server's data later
-    if (server_done == false &&
-        (chunked_buffer->max_read_avail() > max_chunked_ahead_bytes ||
-         chunked_buffer->max_block_count() > max_chunked_ahead_blocks)) {
-      state = CHUNK_FLOW_CONTROL;
-      Debug("http_chunk_flow", "initiating flow control pause on enchunking");
-      return false;
-    } else {
-      state = CHUNK_WRITE_CHUNK;
-      Debug("http_chunk", "creating a chunk of size %" PRId64" bytes", 
write_val);
-
-      // Output the chunk size.
-      if (write_val != max_chunk_size) {
-        int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
-        chunked_buffer->write(tmp, len);
-        chunked_size += len;
-      } else {
-        chunked_buffer->write(max_chunk_header, max_chunk_header_len);
-        chunked_size += max_chunk_header_len;
-      }
+    state = CHUNK_WRITE_CHUNK;
+    Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", 
write_val);
 
-      // Output the chunk itself.
-      //
-      // BZ# 54395 Note - we really should only do a
-      //   block transfer if there is sizable amount of
-      //   data (like we do for the case where we are
-      //   removing chunked encoding in ChunkedHandler::transfer_bytes()
-      //   However, I want to do this fix with as small a risk
-      //   as possible so I'm leaving this issue alone for
-      //   now
-      //
-      chunked_buffer->write(dechunked_reader, write_val);
-      chunked_size += write_val;
-      dechunked_reader->consume(write_val);
-
-      // Output the trailing CRLF.
-      chunked_buffer->write("\r\n", 2);
-      chunked_size += 2;
+    // Output the chunk size.
+    if (write_val != max_chunk_size) {
+      int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
+      chunked_buffer->write(tmp, len);
+      chunked_size += len;
+    } else {
+      chunked_buffer->write(max_chunk_header, max_chunk_header_len);
+      chunked_size += max_chunk_header_len;
     }
+
+    // Output the chunk itself.
+    //
+    // BZ# 54395 Note - we really should only do a
+    //   block transfer if there is sizable amount of
+    //   data (like we do for the case where we are
+    //   removing chunked encoding in ChunkedHandler::transfer_bytes()
+    //   However, I want to do this fix with as small a risk
+    //   as possible so I'm leaving this issue alone for
+    //   now
+    //
+    chunked_buffer->write(dechunked_reader, write_val);
+    chunked_size += write_val;
+    dechunked_reader->consume(write_val);
+
+    // Output the trailing CRLF.
+    chunked_buffer->write("\r\n", 2);
+    chunked_size += 2;
   }
 
   if (server_done) {
@@ -445,11 +360,67 @@ HttpTunnelProducer::HttpTunnelProducer()
     vc(NULL), vc_handler(NULL), read_vio(NULL), read_buffer(NULL),
     buffer_start(NULL), vc_type(HT_HTTP_SERVER), 
chunking_action(TCA_PASSTHRU_DECHUNKED_CONTENT),
     do_chunking(false), do_dechunking(false), do_chunked_passthru(false),
-    init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0), handler_state(0), 
num_consumers(0), alive(false),
-    read_success(false), name(NULL)
+    init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0),
+    handler_state(0), num_consumers(0), alive(false),
+    read_success(false), flow_control_source(0), name(NULL)
 {
 }
 
+uint64_t
+HttpTunnelProducer::backlog(uint64_t limit) {
+  uint64_t zret = 0;
+  // Calculate the total backlog, the # of bytes inside ATS for this producer.
+  // We go all the way through each chain to the ending sink and take the 
maximum
+  // over those paths. Do need to be careful about loops which can occur.
+  for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
+    if (c->alive && c->write_vio) {
+      uint64_t n = 0;
+      if (HT_TRANSFORM == c->vc_type) {
+        n += static_cast<TransformVCChain*>(c->vc)->backlog(limit);
+      } else {
+        IOBufferReader* r = c->write_vio->get_reader();
+        if (r) {
+          n += static_cast<uint64_t>(r->read_avail());
+        }
+      }
+      if (n >= limit) return n;
+
+      if (!c->is_sink()) {
+        HttpTunnelProducer* dsp = c->self_producer;
+        if (dsp) {
+          n += dsp->backlog();
+        }
+      }
+      if (n >= limit) return n;
+      if (n > zret) zret = n;
+    }
+  }
+
+  if (chunked_handler.chunked_reader) {
+    zret += 
static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
+  }
+
+  return zret;
+}
+
+/*  We set the producers in a flow chain specifically rather than
+    using a tunnel level variable in order to handle bi-directional
+    tunnels correctly. In such a case the flow control on producers is
+    not related so a single value for the tunnel won't work.
+*/
+void
+HttpTunnelProducer::set_throttle_src(HttpTunnelProducer* srcp) {
+  HttpTunnelProducer* p = this;
+  p->flow_control_source = srcp;
+  for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
+    if (!c->is_sink()) {
+      p = c->self_producer;
+      if (p)
+        p->set_throttle_src(srcp);
+    }
+  }
+}
+
 HttpTunnelConsumer::HttpTunnelConsumer()
   : link(), producer(NULL), self_producer(NULL), vc_type(HT_HTTP_CLIENT), 
vc(NULL), buffer_reader(NULL),
     vc_handler(NULL), write_vio(NULL), skip_bytes(0), bytes_written(0), 
handler_state(0), alive(false),
@@ -463,14 +434,31 @@ HttpTunnel::HttpTunnel()
 }
 
 void
+HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
+{
+  HttpConfigParams* params = sm_arg->t_state.http_config_param;
+  sm = sm_arg;
+  active = false;
+  mutex = amutex;
+  SET_HANDLER(&HttpTunnel::main_handler);
+  flow_state.enabled_p = params->oride.flow_control_enabled;
+  if (params->oride.flow_low_water_mark > 0)
+    flow_state.low_water = params->oride.flow_low_water_mark;
+  if (params->oride.flow_high_water_mark > 0)
+    flow_state.high_water = params->oride.flow_high_water_mark;
+  // This should always be true, we handled default cases back in 
HttpConfig::reconfigure()
+  ink_assert(flow_state.low_water <= flow_state.high_water);
+}
+
+void
 HttpTunnel::reset()
 {
   ink_assert(active == false);
 #ifdef DEBUG
-  for (int i = 0; i < MAX_PRODUCERS; i++) {
+  for (int i = 0; i < MAX_PRODUCERS; ++i) {
     ink_assert(producers[i].alive == false);
   }
-  for (int j = 0; j < MAX_CONSUMERS; j++) {
+  for (int j = 0; j < MAX_CONSUMERS; ++j) {
     ink_assert(consumers[j].alive == false);
   }
 #endif
@@ -484,7 +472,7 @@ HttpTunnel::reset()
 void
 HttpTunnel::kill_tunnel()
 {
-  for (int i = 0; i < MAX_PRODUCERS; i++) {
+  for (int i = 0; i < MAX_PRODUCERS; ++i) {
     if (producers[i].vc != NULL) {
       chain_abort_all(&producers[i]);
     }
@@ -499,7 +487,7 @@ HttpTunnel::kill_tunnel()
 HttpTunnelProducer *
 HttpTunnel::alloc_producer()
 {
-  for (int i = 0; i < MAX_PRODUCERS; i++) {
+  for (int i = 0; i < MAX_PRODUCERS; ++i) {
     if (producers[i].vc == NULL) {
       num_producers++;
       ink_assert(num_producers <= MAX_PRODUCERS);
@@ -529,7 +517,7 @@ HttpTunnel::deallocate_buffers()
 {
   int num = 0;
   ink_release_assert(active == false);
-  for (int i = 0; i < MAX_PRODUCERS; i++) {
+  for (int i = 0; i < MAX_PRODUCERS; ++i) {
     if (producers[i].read_buffer != NULL) {
       ink_assert(producers[i].vc != NULL);
       free_MIOBuffer(producers[i].read_buffer);
@@ -678,6 +666,16 @@ HttpTunnel::add_consumer(VConnection * vc,
   return c;
 }
 
+void
+HttpTunnel::chain(HttpTunnelConsumer* c, HttpTunnelProducer* p)
+{
+  p->self_consumer = c;
+  c->self_producer = p;
+  // If the flow is already throttled update the chained producer.
+  if (c->producer->is_throttled())
+    p->set_throttle_src(c->producer->flow_control_source);
+}
+
 // void HttpTunnel::tunnel_run()
 //
 //    Makes the tunnel go
@@ -694,7 +692,7 @@ HttpTunnel::tunnel_run(HttpTunnelProducer * p_arg)
 
     ink_assert(active == false);
 
-    for (int i = 0; i < MAX_PRODUCERS; i++) {
+    for (int i = 0 ; i < MAX_PRODUCERS ; ++i) {
       p = producers + i;
       if (p->vc != NULL) {
         producer_run(p);
@@ -952,7 +950,6 @@ HttpTunnel::producer_run(HttpTunnelProducer * p)
   p->buffer_start = NULL;
 }
 
-
 int
 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p)
 {
@@ -966,7 +963,8 @@ HttpTunnel::producer_handler_dechunked(int event, 
HttpTunnelProducer * p)
   case VC_EVENT_READ_COMPLETE:
   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
   case VC_EVENT_EOS:
-    p->chunked_handler.last_server_event = event;
+    p->last_event =
+      p->chunked_handler.last_server_event = event;
     // TODO: Should we check the return code?
     p->chunked_handler.generate_chunked_content();
     break;
@@ -1001,7 +999,8 @@ HttpTunnel::producer_handler_chunked(int event, 
HttpTunnelProducer * p)
     return event;
   }
 
-  p->chunked_handler.last_server_event = event;
+  p->last_event =
+    p->chunked_handler.last_server_event = event;
   bool done = p->chunked_handler.process_chunked_content();
 
   // If we couldn't understand the encoding, return
@@ -1014,18 +1013,6 @@ HttpTunnel::producer_handler_chunked(int event, 
HttpTunnelProducer * p)
     //  sense but no reenables follow
     return VC_EVENT_EOS;
   }
-  // If we are in a flow control state, there is still data in
-  //   buffer so return READ_READY
-  if (p->read_vio && p->chunked_handler.state == 
ChunkedHandler::CHUNK_FLOW_CONTROL) {
-    // INKqa05737 - We need force the server vc to
-    //   disabled since the server may have sent the
-    //   last chunk.  When we go to process that last chunk,
-    //   we will move the server to a keep alive state.  Since
-    //   we are prohibited from changing the buffer, we need
-    //   make sure the iocore doesn't schedule a read
-    p->read_vio->nbytes = p->read_vio->ndone;
-    return VC_EVENT_READ_READY;
-  }
 
   switch (event) {
   case VC_EVENT_READ_READY:
@@ -1077,6 +1064,8 @@ bool HttpTunnel::producer_handler(int event, 
HttpTunnelProducer * p)
     }
   } else if (p->do_dechunking || p->do_chunked_passthru) {
     event = producer_handler_chunked(event, p);
+  } else {
+    p->last_event = event;
   }
 
   //YTS Team, yamsat Plugin
@@ -1185,6 +1174,53 @@ bool HttpTunnel::producer_handler(int event, 
HttpTunnelProducer * p)
   return sm_callback;
 }
 
+bool
+HttpTunnel::consumer_reenable(HttpTunnelConsumer* c)
+{
+  HttpTunnelProducer* p = c->producer;
+  HttpTunnelProducer* srcp = p->flow_control_source;
+  if (p->alive
+#ifndef LAZY_BUF_ALLOC
+      && p->read_buffer->write_avail() > 0
+#endif
+    ) {
+    // Only do flow control if enabled and the producer is an external
+    // source.  Otherwise disable by making the backlog zero. Because
+    // the backlog short cuts quit when the value is equal (or
+    // greater) to the target, we use strict comparison only for
+    // checking low water, otherwise the flow control can stall out.
+    uint64_t backlog = (flow_state.enabled_p && p->is_source())
+      ? p->backlog(flow_state.high_water)
+      : 0;
+
+    if (backlog >= flow_state.high_water) {
+      if (is_debug_tag_set("http_tunnel"))
+        Debug("http_tunnel", "Throttle   %p %" PRId64 " / %" PRId64, p, 
backlog, p->backlog());
+      p->throttle(); // p becomes srcp for future calls to this method
+    } else {
+      if (srcp && c->is_sink()) {
+        // Check if backlog is below low water - note we need to check
+        // against the source producer, not necessarily the producer
+        // for this consumer. We don't have to recompute the backlog
+        // if they are the same because we know low water <= high
+        // water so the value is sufficiently accurate.
+        if (srcp != p)
+          backlog = srcp->backlog(flow_state.low_water);
+        if (backlog < flow_state.low_water) {
+          if (is_debug_tag_set("http_tunnel"))
+            Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, 
backlog, p->backlog());
+          srcp->unthrottle();
+          srcp->read_vio->reenable();
+          // Kick source producer to get flow ... well, flowing.
+          this->producer_handler(VC_EVENT_READ_READY, srcp);
+        }
+      }
+      p->read_vio->reenable();
+    }
+  }
+  return p->is_throttled();
+}
+
 //
 // bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p)
 //
@@ -1200,6 +1236,7 @@ bool HttpTunnel::consumer_handler(int event, 
HttpTunnelConsumer * c)
 {
   bool sm_callback = false;
   HttpConsumerHandler jump_point;
+  HttpTunnelProducer* p = c->producer;
 
   Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, 
c->name, HttpDebugNames::get_event_name(event));
 
@@ -1207,31 +1244,7 @@ bool HttpTunnel::consumer_handler(int event, 
HttpTunnelConsumer * c)
 
   switch (event) {
   case VC_EVENT_WRITE_READY:
-    // Data consumed, reenable producer
-    if (c->producer->alive) {
-      if (c->producer->do_dechunking) {
-        // Because dechunking decouples the inbound and outbound
-        //  buffers, we have to run special code handle the
-        //  reenable
-        chunked_reenable(c->producer, this);
-      } else if (c->producer->do_chunking) {
-        add_chunked_reenable(c->producer, this);
-      } else {
-        /*
-         * Dont check for space availability. The
-         * net code adds more space if required.
-         */
-
-#ifndef LAZY_BUF_ALLOC
-        if (c->producer->read_buffer->write_avail() > 0) {
-          c->producer->read_vio->reenable();
-        }
-#else
-        c->producer->read_vio->reenable();
-#endif
-
-      }
-    }
+    this->consumer_reenable(c);
     break;
 
   case VC_EVENT_WRITE_COMPLETE:
@@ -1262,13 +1275,20 @@ bool HttpTunnel::consumer_handler(int event, 
HttpTunnelConsumer * c)
     //    the SM since the reenabling has the side effect
     //    updating the buffer state for the VConnection
     //    that is being reenabled
-    if (c->producer->alive && c->producer->read_vio
+    if (p->alive && p->read_vio
 #ifndef LAZY_BUF_ALLOC
-        && c->producer->read_buffer->write_avail() > 0
+        && p->read_buffer->write_avail() > 0
 #endif
       ) {
-      c->producer->read_vio->reenable();
+      if (p->is_throttled())
+        this->consumer_reenable(c);
+      else
+        p->read_vio->reenable();
     }
+    // [amc] I don't think this happens but we'll leave a debug trap
+    // here just in case.
+    if (p->is_throttled())
+      Debug("http_tunnel", "Special event %s on %p with flow control on", 
HttpDebugNames::get_event_name(event), p);
     break;
 
   case VC_EVENT_READ_READY:

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e768cb61/proxy/http/HttpTunnel.h
----------------------------------------------------------------------
diff --git a/proxy/http/HttpTunnel.h b/proxy/http/HttpTunnel.h
index f9c4637..2de633c 100644
--- a/proxy/http/HttpTunnel.h
+++ b/proxy/http/HttpTunnel.h
@@ -175,6 +175,16 @@ struct HttpTunnelConsumer
   bool alive;
   bool write_success;
   const char *name;
+
+  /** Check if this consumer is downstream from @a vc.
+      @return @c true if any producer in the tunnel eventually feeds
+      data to this consumer.
+  */
+  bool is_downstream_from(VConnection* vc);
+  /** Check if this is a sink (final data destination).
+      @return @c true if data exits the ATS process at this consumer.
+  */
+  bool is_sink() const;
 };
 
 struct HttpTunnelProducer
@@ -202,12 +212,44 @@ struct HttpTunnelProducer
   int64_t ntodo;                    // what this vc needs to do
   int64_t bytes_read;               // total bytes read from the vc
   int handler_state;              // state used the handlers
+  int last_event;                   ///< Tracking for flow control restarts.
 
   int num_consumers;
 
   bool alive;
   bool read_success;
+  /// Flag and pointer for active flow control throttling.
+  /// If this is set, it points at the source producer that is under flow 
control.
+  /// If @c NULL then data flow is not being throttled.
+  HttpTunnelProducer* flow_control_source;
   const char *name;
+
+  /** Get the largest number of bytes any consumer has not consumed.
+      Use @a limit if you only need to check if the backlog is at least @a 
limit.
+      @return The actual backlog or a number at least @a limit.
+   */
+  uint64_t backlog(
+                  uint64_t limit = INTU64_MAX ///< More than this is irrelevant
+                  );
+  /// Check if producer is original (to ATS) source of data.
+  /// @return @c true if this producer is the source of bytes from outside ATS.
+  bool is_source() const;
+  /// Throttle the flow.
+  void throttle();
+  /// Unthrottle the flow.
+  void unthrottle();
+  /// Check throttled state.
+  bool is_throttled() const;
+
+  /** Set the flow control source producer for the flow.
+      This sets the value for this producer and all downstream producers.
+      @note This is the implementation for @c throttle and @c unthrottle.
+      @see throttle
+      @see unthrottle
+  */
+  void set_throttle_src(
+                       HttpTunnelProducer* srcp ///< Source producer of flow.
+                       );
 };
 
 class PostDataBuffers
@@ -229,6 +271,26 @@ class HttpTunnel:public Continuation
 {
   friend class HttpPagesHandler;
   friend class CoreUtils;
+
+  /** Data for implementing flow control across a tunnel.
+
+      The goal is to bound the amount of data buffered for a
+      transaction flowing through the tunnel to (roughly) between the
+      @a high_water and @a low_water water marks. Due to the chunky nater of 
data
+      flow this always approximate.
+  */
+  struct FlowControl {
+    // Default value for high and low water marks.
+    static uint64_t const DEFAULT_WATER_MARK = 1<<16;
+
+    uint64_t high_water; ///< Buffered data limit - throttle if more than this.
+    uint64_t low_water; ///< Unthrottle if less than this buffered.
+    bool enabled_p; ///< Flow control state (@c false means disabled).
+
+    /// Default constructor.
+    FlowControl();
+  };
+
 public:
   HttpTunnel();
 
@@ -237,7 +299,7 @@ public:
   void kill_tunnel();
   bool is_tunnel_active() { return active; }
   bool is_tunnel_alive();
-  bool is_there_cache_write();
+  bool has_cache_writer();
 
   // YTS Team, yamsat Plugin
   void copy_partial_post_data();
@@ -266,6 +328,7 @@ public:
   void tunnel_run(HttpTunnelProducer * p = NULL);
 
   int main_handler(int event, void *data);
+  bool consumer_reenable(HttpTunnelConsumer* c);
   bool consumer_handler(int event, HttpTunnelConsumer * c);
   bool producer_handler(int event, HttpTunnelProducer * p);
   int producer_handler_dechunked(int event, HttpTunnelProducer * p);
@@ -277,6 +340,18 @@ public:
   void abort_cache_write_finish_others(HttpTunnelProducer * p);
   void append_message_to_producer_buffer(HttpTunnelProducer * p, const char 
*msg, int64_t msg_len);
 
+  /** Mark a producer and consumer as the same underlying object.
+
+      This is use to chain producer/consumer pairs together to
+      indicate the data flows through them sequentially. The primary
+      example is a transform which serves as a consumer on the server
+      side and a producer on the cache/client side.
+  */
+  void chain(
+            HttpTunnelConsumer* c,  ///< Flow goes in here
+            HttpTunnelProducer* p   ///< Flow comes back out here
+            );
+
   void close_vc(HttpTunnelProducer * p);
   void close_vc(HttpTunnelConsumer * c);
 
@@ -301,6 +376,9 @@ private:
 
   bool active;
 
+  /// State data about flow control.
+  FlowControl flow_state;
+
 public:
   PostDataBuffers * postbuf;
 };
@@ -364,15 +442,6 @@ HttpTunnel::is_tunnel_alive()
   return tunnel_alive;
 }
 
-inline void
-HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
-{
-  sm = sm_arg;
-  active = false;
-  mutex = amutex;
-  SET_HANDLER(&HttpTunnel::main_handler);
-}
-
 inline HttpTunnelProducer *
 HttpTunnel::get_producer(VConnection * vc)
 {
@@ -429,7 +498,7 @@ 
HttpTunnel::append_message_to_producer_buffer(HttpTunnelProducer * p, const char
 }
 
 inline bool
-HttpTunnel::is_there_cache_write()
+HttpTunnel::has_cache_writer()
 {
   for (int i = 0; i < MAX_CONSUMERS; i++) {
     if (consumers[i].vc_type == HT_CACHE_WRITE && consumers[i].vc != NULL) {
@@ -438,4 +507,63 @@ HttpTunnel::is_there_cache_write()
   }
   return false;
 }
+
+inline bool
+HttpTunnelConsumer::is_downstream_from(VConnection *vc)
+{
+  HttpTunnelProducer* p = producer;
+  HttpTunnelConsumer* c;
+  while (p) {
+    if (p->vc == vc) return true;
+    // The producer / consumer chain can contain a cycle in the case
+    // of a blind tunnel so give up if we find ourself (the original
+    // consumer).
+    c = p->self_consumer;
+    p = (c && c != this) ? c->producer : 0;
+  }
+  return false;
+}
+
+inline bool
+HttpTunnelConsumer::is_sink() const
+{
+  return HT_HTTP_CLIENT == vc_type || HT_CACHE_WRITE == vc_type;
+}
+
+inline bool
+HttpTunnelProducer::is_source() const
+{
+  // If a producer is marked as a client, then it's part of a bidirectional 
tunnel
+  // and so is an actual source of data.
+  return HT_HTTP_SERVER == vc_type || HT_CACHE_READ == vc_type || 
HT_HTTP_CLIENT == vc_type;
+}
+
+inline bool
+HttpTunnelProducer::is_throttled() const
+{
+  return 0 != flow_control_source;
+}
+
+inline void
+HttpTunnelProducer::throttle()
+{
+  if (!this->is_throttled())
+    this->set_throttle_src(this);
+}
+
+inline void
+HttpTunnelProducer::unthrottle()
+{
+  if (this->is_throttled())
+    this->set_throttle_src(0);
+}
+
+inline
+HttpTunnel::FlowControl::FlowControl()
+         : high_water(DEFAULT_WATER_MARK)
+         , low_water(DEFAULT_WATER_MARK)
+         , enabled_p(false)
+{
+}
+
 #endif

Reply via email to