Hi all,
I am working on adding support for client-side bandwith limiting on squid3. The related description of this feature is at http://wiki.squid-cache.org/Features/ClientBandwidthLimit

The feature implemented by Alex Rousskov for his 3p1-rock branch. In this mail I am including a first port for squid-trunk.

The patch needs more testing and probably farther development but I am posting the current patch for preview and initial feedback.

A short description included in the patch.

Regards,
    Christos
Client-side bandwidth limit (a.k.a., quota or delay pool) implementation.

In mobile environments, Squid may need to limit Squid-to-client bandwidth
available to individual users, identified by their IP addresses. The IP
address pool can be as large as a /10 IPv4 network (4 million unique IP
addresses) and even larger in IPv6 environments. On the other hand, the code
should support thousands of connections coming from a single IP (e.g., a child
proxy).

The implementation is based on storing bandwidth-related "bucket" information
in the existing "client database" hash (client_db.cc). The old code already
assigned each client IP a single ClientInfo object, which satisfies the
client-side IP-based bandwidth pooling requirements. The old hash size is
increased to support up to 32K concurrent clients if needed.

Client-side pools are configured similarly to server-side ones, but there is
only one pool class. See client_delay_pools,
client_delay_initial_bucket_level, client_delay_parameters, and
client_delay_access in squid.conf. The client_delay_access matches the client
with delay parameters. It does not pool clients from different IP addresses
together.

Special care is taken to provide fair distribution of bandwidth among clients
sharing the same bucket (i.e., clients coming from the same IP address).
Multiple same-IP clients competing for bandwidth are queued using FIFO
algorithm. If a bucket becomes empty, the first client among those sharing the
bucket is delayed by 1 second before it can attempt to receive more response
data from Squid.  This delay may need to be lowered in high-bandwidth
environments.

This feature has been documented at
http://wiki.squid-cache.org/Features/ClientBandwidthLimit

=== modified file 'include/Array.h'
--- include/Array.h	2009-07-13 01:20:26 +0000
+++ include/Array.h	2010-10-11 15:47:38 +0000
@@ -80,40 +80,42 @@
 
 public:
     typedef E value_type;
     typedef E* pointer;
     typedef VectorIteratorBase<Vector<E> > iterator;
     typedef VectorIteratorBase<Vector<E> const> const_iterator;
 
     void *operator new (size_t);
     void operator delete (void *);
 
     Vector();
     ~Vector();
     Vector(Vector const &);
     Vector &operator = (Vector const &);
     void clean();
     void reserve (size_t capacity);
     void push_back (E);
     Vector &operator += (E item) {push_back(item); return *this;};
 
     void insert (E);
+    const E &front() const;
+    E &front();
     E &back();
     E pop_back();
     E shift();         // aka pop_front
     void prune(E);
     void preAppend(int app_count);
     bool empty() const;
     size_t size() const;
     iterator begin();
     const_iterator begin () const;
     iterator end();
     const_iterator end () const;
     E& operator [] (unsigned i);
     const E& operator [] (unsigned i) const;
 
     /* Do not change these, until the entry C struct is removed */
     size_t capacity;
     size_t count;
     E *items;
 };
 
@@ -231,40 +233,56 @@
 
 template<class E>
 E
 Vector<E>::pop_back()
 {
     assert (size());
     value_type result = items[--count];
     items[count] = value_type();
     return result;
 }
 
 template<class E>
 E &
 Vector<E>::back()
 {
     assert (size());
     return items[size() - 1];
 }
 
 template<class E>
+const E &
+Vector<E>::front() const
+{
+    assert (size());
+    return items[0];
+}
+
+template<class E>
+E &
+Vector<E>::front()
+{
+    assert (size());
+    return items[0];
+}
+
+template<class E>
 void
 Vector<E>::prune(E item)
 {
     unsigned int n = 0;
     for (unsigned int i = 0; i < count; i++) {
         if (items[i] != item) {
             if (i != n)
                 items[n] = items[i];
             n++;
         }
     }
 
     count = n;
 }
 
 /* if you are going to append a known and large number of items, call this first */
 template<class E>
 void
 Vector<E>::preAppend(int app_count)
 {

=== modified file 'src/ClientInfo.h'
--- src/ClientInfo.h	2010-05-02 19:32:42 +0000
+++ src/ClientInfo.h	2010-10-11 15:47:38 +0000
@@ -1,33 +1,95 @@
 #ifndef SQUID__SRC_CLIENTINFO_H
 #define SQUID__SRC_CLIENTINFO_H
 
 #include "ip/Address.h"
 #include "hash.h"
 #include "enums.h"
 #include "typedefs.h"
+#include "cbdata.h"
+#include "Array.h"
+
+#if DELAY_POOLS
+class CommQuotaQueue;
+#endif
 
 class ClientInfo
 {
 public:
     hash_link hash;             /* must be first */
 
     Ip::Address addr;
 
     struct {
         int result_hist[LOG_TYPE_MAX];
         int n_requests;
         kb_t kbytes_in;
         kb_t kbytes_out;
         kb_t hit_kbytes_out;
     } Http, Icp;
 
     struct {
         time_t time;
         int n_req;
         int n_denied;
     } cutoff;
     int n_established;          /* number of current established connections */
     time_t last_seen;
+#if DELAY_POOLS
+    double writeSpeedLimit;/* Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client */
+    double prevTime;/* previous time when we checked */
+    double bucketSize; ///< how much can be written now
+    double bucketSizeLimit;  ///< maximum bucket size
+    bool writeLimitingActive; /* Is write limiter active */
+    bool firstTimeConnection;/* is this first time connection for this client */
+
+    CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
+    int rationedQuota; ///< precomputed quota preserving fairness among clients
+    int rationedCount; ///< number of clients that will receive rationedQuota
+    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+    bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
+
+    // all those functions access Comm fd_table and are defined in comm.cc
+    bool hasQueue() const;  ///< whether any clients are waiting for write quota
+    bool hasQueue(const CommQuotaQueue*) const;  ///< has a given queue
+    unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary
+    int quotaPeekFd() const; ///< retuns the next fd reservation
+    unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
+    void quotaDequeue(); ///< pops queue head from queue
+    void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
+    int quotaForDequed(); ///< allocate quota for a just dequeued client
+    void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+    void quotaDumpQueue(); ///< dumps quota queue for debugging
+#endif
+};
+
+#if DELAY_POOLS
+// a queue of Comm clients waiting for I/O quota controlled by delay pools
+class CommQuotaQueue
+{
+public:
+    CommQuotaQueue(ClientInfo *info);
+    ~CommQuotaQueue();
+
+	bool empty() const { return fds.empty(); }
+	size_t size() const { return fds.size(); }
+	int front() const { return fds.front(); }
+	unsigned int enqueue(int fd);
+	void dequeue();
+
+    ClientInfo *clientInfo; ///< bucket responsible for quota maintenance
+
+    // these counters might overflow; that is OK because they are for IDs only
+    int ins; ///< number of enqueue calls, used to generate a "reservation" ID
+    int outs; ///< number of dequeue calls, used to check the "reservation" ID
+
+private:
+    // TODO: optimize using a Ring- or List-based store?
+	typedef Vector<int> Store;
+    Store fds; ///< descriptor queue
+
+    CBDATA_CLASS2(CommQuotaQueue);
 };
+#endif /* DELAY_POOLS */
 
 #endif

=== modified file 'src/Makefile.am'
--- src/Makefile.am	2010-10-05 11:34:01 +0000
+++ src/Makefile.am	2010-10-11 15:47:38 +0000
@@ -58,41 +58,45 @@
 	delay_pools.cc \
 	DelayId.cc \
 	DelayId.h \
 	DelayIdComposite.h \
 	DelayBucket.cc \
 	DelayBucket.h \
 	DelayConfig.cc \
 	DelayConfig.h \
 	DelayPool.cc \
 	DelayPool.h \
 	DelayPools.h \
 	DelaySpec.cc \
 	DelaySpec.h \
 	DelayTagged.cc \
 	DelayTagged.h \
 	DelayUser.cc \
 	DelayUser.h \
 	DelayVector.cc \
 	DelayVector.h \
 	NullDelayId.cc \
-	NullDelayId.h
+	NullDelayId.h \
+	\
+	ClientDelayConfig.cc \
+	ClientDelayConfig.h
+	
 if USE_DELAY_POOLS
 DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE)
 else
 DELAY_POOL_SOURCE = 
 endif
 
 if ENABLE_XPROF_STATS
 XPROF_STATS_SOURCE = ProfStats.cc
 else
 XPROF_STATS_SOURCE = 
 endif
 
 if ENABLE_HTCP
 HTCPSOURCE = htcp.cc htcp.h
 endif
 
 if MAKE_LEAKFINDER
 LEAKFINDERSOURCE =  LeakFinder.cc
 else
 LEAKFINDERSOURCE = 

=== modified file 'src/cache_cf.cc'
--- src/cache_cf.cc	2010-10-06 13:03:11 +0000
+++ src/cache_cf.cc	2010-10-11 15:51:06 +0000
@@ -1465,40 +1465,82 @@
 static void
 parse_delay_pool_class(DelayConfig * cfg)
 {
     cfg->parsePoolClass();
 }
 
 static void
 parse_delay_pool_rates(DelayConfig * cfg)
 {
     cfg->parsePoolRates();
 }
 
 static void
 parse_delay_pool_access(DelayConfig * cfg)
 {
     cfg->parsePoolAccess(LegacyParser);
 }
 
 #endif
 
+#if DELAY_POOLS
+#include "ClientDelayConfig.h"
+/* do nothing - free_client_delay_pool_count is the magic free function.
+ * this is why client_delay_pool_count isn't just marked TYPE: ushort
+ */
+
+#define free_client_delay_pool_access(X)
+#define free_client_delay_pool_rates(X)
+#define dump_client_delay_pool_access(X, Y, Z)
+#define dump_client_delay_pool_rates(X, Y, Z)
+
+static void
+free_client_delay_pool_count(ClientDelayConfig * cfg)
+{
+    cfg->freePoolCount();
+}
+
+static void
+dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg)
+{
+    cfg.dumpPoolCount (entry, name);
+}
+
+static void
+parse_client_delay_pool_count(ClientDelayConfig * cfg)
+{
+    cfg->parsePoolCount();
+}
+
+static void
+parse_client_delay_pool_rates(ClientDelayConfig * cfg)
+{
+    cfg->parsePoolRates();
+}
+
+static void
+parse_client_delay_pool_access(ClientDelayConfig * cfg)
+{
+    cfg->parsePoolAccess(LegacyParser);
+}
+#endif
+
 #if USE_HTTP_VIOLATIONS
 static void
 dump_http_header_access(StoreEntry * entry, const char *name, header_mangler header[])
 {
     int i;
 
     for (i = 0; i < HDR_ENUM_END; i++) {
         if (header[i].access_list != NULL) {
             storeAppendPrintf(entry, "%s ", name);
             dump_acl_access(entry, httpHeaderNameById(i),
                             header[i].access_list);
         }
     }
 }
 
 static void
 parse_http_header_access(header_mangler header[])
 {
     int id, i;
     char *t = NULL;

=== modified file 'src/cf.data.depend'
--- src/cf.data.depend	2010-10-06 03:50:45 +0000
+++ src/cf.data.depend	2010-10-11 15:47:38 +0000
@@ -2,40 +2,43 @@
 access_log		acl	logformat
 acl			external_acl_type auth_param
 acl_access		acl
 acl_address		acl
 acl_b_size_t		acl
 acl_tos			acl
 acl_nfmark		acl
 address
 authparam
 b_int64_t
 b_size_t
 cachedir		cache_replacement_policy
 cachemgrpasswd
 ConfigAclTos
 CpuAffinityMap
 debug
 delay_pool_access	acl	delay_class
 delay_pool_class	delay_pools
 delay_pool_count
 delay_pool_rates	delay_class
+client_delay_pool_access	acl
+client_delay_pool_count
+client_delay_pool_rates
 denyinfo		acl
 eol
 externalAclHelper	auth_param
 HelperChildConfig
 hostdomain		cache_peer
 hostdomaintype		cache_peer
 http_header_access	acl
 http_header_replace
 http_port_list
 https_port_list
 adaptation_access_type	adaptation_service_set adaptation_service_chain acl icap_service icap_class
 adaptation_service_set_type	icap_service ecap_service
 adaptation_service_chain_type	icap_service ecap_service
 icap_access_type	icap_class acl
 icap_class_type		icap_service
 icap_service_type
 icap_service_failure_limit
 ecap_service_type
 int
 kb_int64_t

=== modified file 'src/cf.data.pre'
--- src/cf.data.pre	2010-10-06 03:50:45 +0000
+++ src/cf.data.pre	2010-10-11 15:47:38 +0000
@@ -4778,40 +4778,135 @@
 	be limited to 128Kb no matter how many workstations they are logged into.:
 
 delay_parameters 4 32000/32000 8000/8000 600/64000 16000/16000
 DOC_END
 
 NAME: delay_initial_bucket_level
 COMMENT: (percent, 0-100)
 TYPE: ushort
 DEFAULT: 50
 IFDEF: DELAY_POOLS
 LOC: Config.Delay.initial
 DOC_START
 	The initial bucket percentage is used to determine how much is put
 	in each bucket when squid starts, is reconfigured, or first notices
 	a host accessing it (in class 2 and class 3, individual hosts and
 	networks only have buckets associated with them once they have been
 	"seen" by squid).
 DOC_END
 
 COMMENT_START
+ CLIENT DELAY POOL PARAMETERS
+ -----------------------------------------------------------------------------
+COMMENT_END
+
+NAME: client_delay_pools
+TYPE: client_delay_pool_count
+DEFAULT: 0
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+	This option specifies the number of client delay pools used. It must
+	preceed other client_delay_* options.
+
+Example:
+ client_delay_pools 2
+DOC_END
+
+NAME: client_delay_initial_bucket_level
+COMMENT: (percent, 0-no_limit)
+TYPE: ushort
+DEFAULT: 50
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay.initial
+DOC_START
+	This option determines the initial bucket size as a percentage of
+	max_bucket_size from client_delay_parameters. Buckets are created
+	at the time of the "first" connection from the matching IP. Idle
+	buckets are periodically deleted up.
+
+	You can specify more than 100 percent but note that such "oversized"
+	buckets are not refilled until their size goes down to max_bucket_size
+	from client_delay_parameters.
+
+Example:
+ client_delay_initial_bucket_level 50
+DOC_END
+
+NAME: client_delay_parameters
+TYPE: client_delay_pool_rates
+DEFAULT: none
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+
+	This option configures client-side bandwidth limits using the
+	following format:
+
+	    client_delay_parameters pool speed_limit max_bucket_size
+
+	pool is an integer ID used for client_delay_access matching.
+
+	speed_limit is bytes added to the bucket per second.
+
+	max_bucket_size is the maximum size of a bucket, enforced after any
+	speed_limit additions.
+
+	Please see the delay_parameters option for more information and
+	examples.
+
+Example:
+ client_delay_parameters 1 1024 2048
+ client_delay_parameters 2 51200 16384
+DOC_END
+
+NAME: client_delay_access
+TYPE: client_delay_pool_access
+DEFAULT: none
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+
+	This option determines the client-side delay pool for the
+	request:
+
+	    client_delay_access pool_ID allow|deny acl_name
+
+	All client_delay_access options are checked in their pool ID
+	order, starting with pool 1. The first checked pool with allowed
+	request is selected for the request. If no ACL matches or there
+	are no client_delay_access options, the request bandwidth is not
+	limited.
+
+	The ACL-selected pool is then used to find the
+	client_delay_parameters for the request. Client-side pools are
+	not used to aggregate clients. Clients are always aggregated
+	based on their source IP addresses (one bucket per source IP).
+
+	Please see delay_access for more examples.
+
+Example:
+ client_delay_access 1 allow low_rate_network
+ client_delay_access 2 allow vips_network
+DOC_END
+
+COMMENT_START
  WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
  -----------------------------------------------------------------------------
 COMMENT_END
 
 NAME: wccp_router
 TYPE: address
 LOC: Config.Wccp.router
 DEFAULT: any_addr
 IFDEF: USE_WCCP
 DOC_START
 	Use this option to define your WCCP ``home'' router for
 	Squid.
 
 	wccp_router supports a single WCCP(v1) router
 
 	wccp2_router supports multiple WCCPv2 routers
 
 	only one of the two may be used at the same time and defines
 	which version of WCCP to use.
 DOC_END

=== modified file 'src/client_db.cc'
--- src/client_db.cc	2010-07-25 08:10:12 +0000
+++ src/client_db.cc	2010-10-11 16:04:11 +0000
@@ -32,86 +32,169 @@
  *
  */
 
 #include "squid.h"
 #include "event.h"
 #include "CacheManager.h"
 #include "ClientInfo.h"
 #include "ip/Address.h"
 #include "SquidMath.h"
 #include "SquidTime.h"
 #include "Store.h"
 
 
 static hash_table *client_table = NULL;
 
 static ClientInfo *clientdbAdd(const Ip::Address &addr);
 static FREE clientdbFreeItem;
 static void clientdbStartGC(void);
 static void clientdbScheduledGC(void *);
 
+#if DELAY_POOLS
+static int max_clients = 32768;
+#else
 static int max_clients = 32;
+#endif
+
 static int cleanup_running = 0;
 static int cleanup_scheduled = 0;
 static int cleanup_removed;
 
+#if DELAY_POOLS
+#define CLIENT_DB_HASH_SIZE 65357
+#else
 #define CLIENT_DB_HASH_SIZE 467
+#endif
 
 static ClientInfo *
 
 clientdbAdd(const Ip::Address &addr)
 {
     ClientInfo *c;
     char *buf = new char[MAX_IPSTRLEN];
     c = (ClientInfo *)memAllocate(MEM_CLIENT_INFO);
     c->hash.key = addr.NtoA(buf,MAX_IPSTRLEN);
     c->addr = addr;
+#if DELAY_POOLS
+    /* setup default values for client write limiter */
+    c->writeLimitingActive=false;
+    c->writeSpeedLimit=0;
+    c->bucketSize = 0;
+    c->firstTimeConnection=true;
+    c->quotaQueue = NULL;
+    c->rationedQuota = 0;
+    c->rationedCount = 0;
+    c->selectWaiting = false;
+    c->eventWaiting = false;
+
+    /* get current time */
+    getCurrentTime();
+    c->prevTime=current_dtime;/* put current time to have something sensible here */
+#endif
     hash_join(client_table, &c->hash);
     statCounter.client_http.clients++;
 
     if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) {
         cleanup_scheduled++;
         eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 90, 0);
     }
 
     return c;
 }
 
+#if DELAY_POOLS
+/* Configure client write limiting for this client(note:"client" here means - IP)
+   info must be got using clientdbGetInfo (so we avoid another hash lookup
+   writeSpeedLimit is speed limit configured in config for this pool
+   initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
+   (it's current configured by http_accept in client_side.cc using writeSpeedLimit and configured value(50% by default)
+   highWatermark is maximum bucket value
+   */
+void clientdbSetWriteLimiter(ClientInfo * info, const int writeSpeedLimit,const double initialBurst,const double highWatermark)
+{
+    assert(info);
+    debugs(33,5, HERE << "Write limits for " << (const char*)info->hash.key << 
+        " speed=" << writeSpeedLimit << " burst=" << initialBurst <<
+        " highwatermark=" << highWatermark);
+
+    // set or possibly update traffic shaping parameters
+    info->writeLimitingActive = true;
+    info->writeSpeedLimit = writeSpeedLimit;
+    info->bucketSizeLimit = highWatermark;
+
+    // but some members should only be set once for a newly activated bucket
+    if (info->firstTimeConnection) {
+        info->firstTimeConnection = false;
+
+        assert(!info->selectWaiting);
+        assert(!info->quotaQueue);
+        info->quotaQueue = new CommQuotaQueue(info);
+        cbdataReference(info->quotaQueue);
+
+        info->bucketSize = initialBurst;
+        info->prevTime = current_dtime;
+    }
+}
+#endif
+
 static void
 clientdbRegisterWithCacheManager(void)
 {
     CacheManager::GetInstance()->
     registerAction("client_list", "Cache Client List", clientdbDump, 0, 1);
 }
 
 void
 clientdbInit(void)
 {
     clientdbRegisterWithCacheManager();
 
     if (client_table)
         return;
 
     client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string);
-
 }
 
+#if DELAY_POOLS
+/* returns ClientInfo for given IP addr
+   Returns NULL if no such client (or clientdb turned off)
+   (it is assumed that clientdbEstablished will be called before and create client record if needed)
+*/
+ClientInfo * clientdbGetInfo(const Ip::Address &addr)
+{
+    char key[MAX_IPSTRLEN];
+    ClientInfo *c;
+
+    if (!Config.onoff.client_db)
+        return NULL;
+
+    addr.NtoA(key,MAX_IPSTRLEN);
+
+    c = (ClientInfo *) hash_lookup(client_table, key);
+    if (c==NULL)
+    {
+        debugs(77,1,"Client db does not contain information for given IP address "<<(const char*)key);
+        return NULL;
+    }
+    return c;
+}
+#endif
 void
 clientdbUpdate(const Ip::Address &addr, log_type ltype, protocol_t p, size_t size)
 {
     char key[MAX_IPSTRLEN];
     ClientInfo *c;
 
     if (!Config.onoff.client_db)
         return;
 
     addr.NtoA(key,MAX_IPSTRLEN);
 
     c = (ClientInfo *) hash_lookup(client_table, key);
 
     if (c == NULL)
         c = clientdbAdd(addr);
 
     if (c == NULL)
         debug_trap("clientdbUpdate: Failed to add entry");
 
     if (p == PROTO_HTTP) {
@@ -282,40 +365,49 @@
                               log_tags[l],
                               c->Http.result_hist[l],
                               Math::intPercent(c->Http.result_hist[l], c->Http.n_requests));
         }
 
         storeAppendPrintf(sentry, "\n");
     }
 
     storeAppendPrintf(sentry, "TOTALS\n");
     storeAppendPrintf(sentry, "ICP : %d Queries, %d Hits (%3d%%)\n",
                       icp_total, icp_hits, Math::intPercent(icp_hits, icp_total));
     storeAppendPrintf(sentry, "HTTP: %d Requests, %d Hits (%3d%%)\n",
                       http_total, http_hits, Math::intPercent(http_hits, http_total));
 }
 
 static void
 clientdbFreeItem(void *data)
 {
     ClientInfo *c = (ClientInfo *)data;
     safe_free(c->hash.key);
+
+#if DELAY_POOLS
+    if (CommQuotaQueue *q = c->quotaQueue) {
+        q->clientInfo = NULL;
+        delete q; // invalidates cbdata, cancelling any pending kicks
+        cbdataReferenceDone(q);
+	}
+#endif
+
     memFree(c, MEM_CLIENT_INFO);
 }
 
 void
 clientdbFreeMemory(void)
 {
     hashFreeItems(client_table, clientdbFreeItem);
     hashFreeMemory(client_table);
     client_table = NULL;
 }
 
 static void
 clientdbScheduledGC(void *unused)
 {
     cleanup_scheduled = 0;
     clientdbStartGC();
 }
 
 static void
 clientdbGC(void *unused)

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2010-10-01 00:41:19 +0000
+++ src/client_side.cc	2010-10-11 15:47:38 +0000
@@ -94,40 +94,44 @@
 #include "comm.h"
 #include "comm/ListenStateData.h"
 #include "base/TextException.h"
 #include "ConnectionDetail.h"
 #include "eui/Config.h"
 #include "fde.h"
 #include "HttpHdrContRange.h"
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "ip/Intercept.h"
 #include "ipc/StartListening.h"
 #include "MemBuf.h"
 #include "MemObject.h"
 #include "ProtoPort.h"
 #include "rfc1738.h"
 #include "SquidTime.h"
 #include "Store.h"
 
+#if DELAY_POOLS
+#include "ClientInfo.h"
+#endif
+
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
 #endif
 
 /// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
 class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
 {
 public:
     typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
     ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
             handler(aHandler), portCfg(aPortCfg) {}
 
     virtual void print(std::ostream &os) const {
         startPrint(os) <<
         ", port=" << (void*)portCfg << ')';
     }
 
     virtual bool canDial(AsyncCall &) const { return true; }
     virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
 
@@ -3125,40 +3129,82 @@
 #endif
 
 #if USE_SQUID_EUI
     if (Eui::TheConfig.euiLookup) {
         if (details->peer.IsIPv4()) {
             connState->peer_eui48.lookup(details->peer);
         } else if (details->peer.IsIPv6()) {
             connState->peer_eui64.lookup(details->peer);
         }
     }
 #endif
 
     if (s->tcp_keepalive.enabled) {
         commSetTcpKeepalive(newfd, s->tcp_keepalive.idle, s->tcp_keepalive.interval, s->tcp_keepalive.timeout);
     }
 
     connState->readSomeData();
 
     clientdbEstablished(details->peer, 1);
 
+#if DELAY_POOLS
+    fd_table[newfd].clientInfo = NULL;
+
+    ClientDelayPools & pools(Config.ClientDelay.pools);
+    if (Config.onoff.client_db)
+    {
+        /* it was said several times that client write limiter does not work if client_db is disabled */
+
+        for (unsigned int pool = 0; pool < pools.size(); pool++) {
+
+            /* pools require explicit 'allow' to assign a client into them */
+            if (!pools[pool].access)
+                continue; // warned in ClientDelayConfig::Finalize()
+
+            ACLFilledChecklist ch(pools[pool].access, NULL, NULL);
+
+            // TODO: we check early to limit error response bandwith but we 
+            // should recheck when we can honor delay_pool_uses_indirect
+
+            ch.src_addr = details->peer;
+            ch.my_addr = details->me;
+
+            if (ch.fastCheck()) {
+
+                /*  request client information from db after we did all checks
+                    this will save hash lookup if client failed checks */
+                ClientInfo * cli = clientdbGetInfo(details->peer);
+                assert(cli);
+
+                /* put client info in FDE */
+                fd_table[newfd].clientInfo = cli;
+
+                /* setup write limiter for this request */
+                const double burst = floor(0.5 +
+                    (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
+                clientdbSetWriteLimiter(cli, pools[pool].rate,
+                    burst, pools[pool].highwatermark);
+                break;
+            }
+        }
+    }
+#endif
     incoming_sockets_accepted++;
 }
 
 #if USE_SSL
 
 /** Create SSL connection structure and update fd_table */
 static SSL *
 httpsCreate(int newfd, ConnectionDetail *details, SSL_CTX *sslContext)
 {
     SSL *ssl = SSL_new(sslContext);
 
     if (!ssl) {
         const int ssl_error = ERR_get_error();
         debugs(83, 1, "httpsAccept: Error allocating handle: " << ERR_error_string(ssl_error, NULL)  );
         comm_close(newfd);
         return NULL;
     }
 
     SSL_set_fd(ssl, newfd);
     fd_table[newfd].ssl = ssl;

=== modified file 'src/comm.cc'
--- src/comm.cc	2010-10-06 03:50:45 +0000
+++ src/comm.cc	2010-10-13 17:45:11 +0000
@@ -36,74 +36,87 @@
 #include "StoreIOBuffer.h"
 #include "comm.h"
 #include "event.h"
 #include "fde.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
 #include "comm/ListenStateData.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "ConnectionDetail.h"
 #include "MemBuf.h"
 #include "pconn.h"
 #include "SquidTime.h"
 #include "CommCalls.h"
 #include "DescriptorSet.h"
 #include "icmp/net_db.h"
 #include "ip/Address.h"
 #include "ip/Intercept.h"
 #include "ip/QosConfig.h"
 #include "ip/tools.h"
+#include "ClientInfo.h"
 
+#include "cbdata.h"
 #if defined(_SQUID_CYGWIN_)
 #include <sys/ioctl.h>
 #endif
 #ifdef HAVE_NETINET_TCP_H
 #include <netinet/tcp.h>
 #endif
 
 /*
  * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
  */
 
 typedef enum {
     IOCB_NONE,
     IOCB_READ,
     IOCB_WRITE
 } iocb_type;
 
 static void commStopHalfClosedMonitor(int fd);
 static IOCB commHalfClosedReader;
 static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
 static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
 
+#if DELAY_POOLS
+CBDATA_CLASS_INIT(CommQuotaQueue);
+
+static void commHandleWriteHelper(void * data);
+#endif
+
+static void commSelectOrQueueWrite(const int fd);
 
 struct comm_io_callback_t {
     iocb_type type;
     int fd;
     AsyncCall::Pointer callback;
     char *buf;
     FREE *freefunc;
     int size;
     int offset;
     comm_err_t errcode;
     int xerrno;
+#if DELAY_POOLS
+    unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
+#endif
+
 
     bool active() const { return callback != NULL; }
 };
 
 struct _comm_fd {
     int fd;
     comm_io_callback_t	readcb;
     comm_io_callback_t	writecb;
 };
 typedef struct _comm_fd comm_fd_t;
 comm_fd_t *commfd_table;
 
 // TODO: make this a comm_io_callback_t method?
 bool
 commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
 {
     assert(ccb->fd == fd);
     assert(ccb->type == type);
     return ccb->active();
 }
@@ -124,45 +137,50 @@
                     AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
 {
     assert(!ccb->active());
     assert(ccb->type == type);
     assert(cb != NULL);
     ccb->fd = fd;
     ccb->callback = cb;
     ccb->buf = buf;
     ccb->freefunc = freefunc;
     ccb->size = size;
     ccb->offset = 0;
 }
 
 
 // Schedule the callback call and clear the callback
 static void
 commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
 {
     debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
            code << ", " << xerrno << ")");
+
     assert(ccb->active());
     assert(ccb->fd == fd);
     ccb->errcode = code;
     ccb->xerrno = xerrno;
 
+#if DELAY_POOLS
+    ccb->quotaQueueReserv = 0;
+#endif
+
     comm_io_callback_t cb = *ccb;
 
     /* We've got a copy; blow away the real one */
     /* XXX duplicate code from commio_cancel_callback! */
     ccb->xerrno = 0;
     ccb->callback = NULL; // cb has it
 
     /* free data */
     if (cb.freefunc) {
         cb.freefunc(cb.buf);
         cb.buf = NULL;
     }
 
     if (cb.callback != NULL) {
         typedef CommIoCbParams Params;
         Params &params = GetCommParams<Params>(cb.callback);
         params.fd = cb.fd;
         params.buf = cb.buf;
         params.size = cb.offset;
         params.flag = cb.errcode;
@@ -170,40 +188,44 @@
         ScheduleCallHere(cb.callback);
     }
 }
 
 
 /*
  * Cancel the given callback
  *
  * Remember that the data is cbdataRef'ed.
  */
 // TODO: make this a comm_io_callback_t method
 static void
 commio_cancel_callback(int fd, comm_io_callback_t *ccb)
 {
     debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
     assert(ccb->fd == fd);
     assert(ccb->active());
 
     ccb->xerrno = 0;
     ccb->callback = NULL;
+
+#if DELAY_POOLS
+    ccb->quotaQueueReserv = 0;
+#endif
 }
 
 /*
  * Call the given comm callback; assumes the callback is valid.
  *
  * @param ccb		io completion callback
  */
 void
 commio_call_callback(comm_io_callback_t *ccb)
 {
 }
 
 class ConnectStateData
 {
 
 public:
     void *operator new (size_t);
     void operator delete (void *);
     static void Connect (int fd, void *me);
     void connect();
@@ -1572,40 +1594,50 @@
     typedef CommCloseCbParams Params;
     Params &startParams = GetCommParams<Params>(startCall);
     startParams.fd = fd;
     ScheduleCallHere(startCall);
 
     // a half-closed fd may lack a reader, so we stop monitoring explicitly
     if (commHasHalfClosedMonitor(fd))
         commStopHalfClosedMonitor(fd);
     commSetTimeout(fd, -1, NULL, NULL);
 
     // notify read/write handlers after canceling select reservations, if any
     if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
         commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
         commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
     }
     if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
         commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
         commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
     }
 
+#if DELAY_POOLS
+    if (ClientInfo *clientInfo = F->clientInfo) {
+        if (clientInfo->selectWaiting) {
+            clientInfo->selectWaiting = false;
+            // kick queue or it will get stuck as commWriteHandle is not called
+            clientInfo->kickQuotaQueue();
+        }
+     }
+#endif
+
     commCallCloseHandlers(fd);
 
     if (F->pconn.uses)
         F->pconn.pool->count(F->pconn.uses);
 
     comm_empty_os_read_buffers(fd);
 
 
     AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
                                     CommCloseCbPtrFun(comm_close_complete, NULL));
     Params &completeParams = GetCommParams<Params>(completeCall);
     completeParams.fd = fd;
     // must use async call to wait for all callbacks
     // scheduled before comm_close() to finish
     ScheduleCallHere(completeCall);
 
     PROF_stop(comm_close);
 }
 
 /* Send a udp datagram to specified TO_ADDR. */
@@ -1918,137 +1950,408 @@
      * after accepting a client but before it opens a socket or a file.
      * Since Squid_MaxFD can be as high as several thousand, don't waste them */
     RESERVED_FD = min(100, Squid_MaxFD / 4);
 
     conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
 
     TheHalfClosed = new DescriptorSet;
 }
 
 void
 comm_exit(void)
 {
     delete TheHalfClosed;
     TheHalfClosed = NULL;
 
     safe_free(fd_table);
     safe_free(fdd_table);
     safe_free(commfd_table);
 }
 
+#if DELAY_POOLS
+// called when the queue is done waiting for the client bucket to fill
+static void
+commHandleWriteHelper(void * data)
+{
+    CommQuotaQueue *queue = static_cast<CommQuotaQueue*>(data);
+    assert(queue);
+
+    ClientInfo *clientInfo = queue->clientInfo;
+    // ClientInfo invalidates queue if freed, so if we got here through,
+    // evenAdd cbdata protections, everything should be valid and consistent
+    assert(clientInfo); 
+    assert(clientInfo->hasQueue());
+    assert(clientInfo->hasQueue(queue));
+    assert(!clientInfo->selectWaiting);
+    assert(clientInfo->eventWaiting);
+    clientInfo->eventWaiting = false;
+
+    do {
+        // check that the head descriptor is still relevant
+        const int head = clientInfo->quotaPeekFd();
+        comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
+
+        if (fd_table[head].clientInfo == clientInfo &&
+            clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
+            !fd_table[head].closing()) {
+
+            // wait for the head descriptor to become ready for writing
+            commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+            clientInfo->selectWaiting = true;
+            return;
+         }
+
+         clientInfo->quotaDequeue(); // remove the no longer relevant descriptor
+         // and continue looking for a relevant one
+    } while (clientInfo->hasQueue());
+
+    debugs(77,3, HERE << "emptied queue");
+}
+
+bool
+ClientInfo::hasQueue() const
+{
+    assert(quotaQueue);
+    return !quotaQueue->empty();
+}
+
+bool
+ClientInfo::hasQueue(const CommQuotaQueue *q) const
+{
+    assert(quotaQueue);
+    return quotaQueue == q;
+}
+
+/// returns the first descriptor to be dequeued
+int
+ClientInfo::quotaPeekFd() const
+{
+    assert(quotaQueue);
+    return quotaQueue->front();
+}
+
+/// returns the reservation ID of the first descriptor to be dequeued
+unsigned int
+ClientInfo::quotaPeekReserv() const
+{
+    assert(quotaQueue);
+    return quotaQueue->outs + 1;
+}
+
+/// queues a given fd, creating the queue if necessary; returns reservation ID
+unsigned int
+ClientInfo::quotaEnqueue(int fd)
+{
+    assert(quotaQueue);
+    return quotaQueue->enqueue(fd);
+}
+
+/// removes queue head
+void
+ClientInfo::quotaDequeue()
+{
+    assert(quotaQueue);
+    quotaQueue->dequeue();
+}
+
+void
+ClientInfo::kickQuotaQueue()
+{
+    if (!eventWaiting && !selectWaiting && hasQueue()) {
+        // wait at least a second if the bucket is empty
+        const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+        eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
+            quotaQueue, delay, 0, true);
+        eventWaiting = true;
+    }
+}
+
+/// calculates how much to write for a single dequeued client
+int
+ClientInfo::quotaForDequed()
+{
+    /* If we have multiple clients and give full bucketSize to each client then
+     * clt1 may often get a lot more because clt1->clt2 time distance in the
+     * select(2) callback order may be a lot smaller than cltN->clt1 distance.
+     * We divide quota evenly to be more fair. */
+
+    if (!rationedCount) {
+        rationedCount = quotaQueue->size() + 1;
+
+        // The delay in ration recalculation _temporary_ deprives clients from
+        // bytes that should have trickled in while rationedCount was positive.
+        refillBucket();
+
+        // Rounding errors do not accumulate here, but we round down to avoid
+        // negative bucket sizes after write with rationedCount=1.
+        rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+        debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
+            '*' << rationedCount);
+    }
+
+    --rationedCount;
+    debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
+        " rations remaining: " << rationedCount);
+
+    // update 'last seen' time to prevent clientdb GC from dropping us
+    last_seen = squid_curtime;
+    return rationedQuota;
+}
+
+///< adds bytes to the quota bucket based on the rate and passed time
+void
+ClientInfo::refillBucket()
+{
+    // all these times are in seconds, with double precision
+    const double currTime = current_dtime;
+    const double timePassed = currTime - prevTime;
+
+    // Calculate allowance for the time passed. Use double to avoid 
+    // accumulating rounding errors for small intervals. For example, always
+    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+    const double gain = timePassed * writeSpeedLimit;
+
+    debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
+        bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
+        " = " << gain << ')');
+
+    // to further combat error accumulation during micro updates,
+    // quit before updating time if we cannot add at least one byte
+    if (gain < 1.0)
+       return;
+
+    prevTime = currTime;
+
+    // for "first" connections, drain initial fat before refilling but keep
+    // updating prevTime to avoid bursts after the fat is gone
+    if (bucketSize > bucketSizeLimit) {
+       debugs(77,4, HERE << "not refilling while draining initial fat");
+       return;
+    }
+
+    bucketSize += gain;
+
+    // obey quota limits
+    if (bucketSize > bucketSizeLimit)
+        bucketSize = bucketSizeLimit;
+}
+
+CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
+    ins(0), outs(0)
+{
+    assert(clientInfo);
+}
+
+CommQuotaQueue::~CommQuotaQueue()
+{
+    assert(!clientInfo); // ClientInfo should clear this before destroying us
+}
+
+/// places the given fd at the end of the queue; returns reservation ID
+unsigned int
+CommQuotaQueue::enqueue(int fd)
+{
+    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+        ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
+    fds.push_back(fd);
+    return ++ins;
+}
+
+/// removes queue head
+void
+CommQuotaQueue::dequeue()
+{
+    assert(!fds.empty());
+    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+        ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
+        fds.size());
+    fds.shift();
+    ++outs;
+}
+
+#endif
+
 /* Write to FD. */
 static void
 commHandleWrite(int fd, void *data)
 {
     comm_io_callback_t *state = (comm_io_callback_t *)data;
     int len = 0;
     int nleft;
 
-    assert(state == COMMIO_FD_WRITECB(fd));
+    assert(state==COMMIO_FD_WRITECB(fd));
 
     PROF_start(commHandleWrite);
     debugs(5, 5, "commHandleWrite: FD " << fd << ": off " <<
            (long int) state->offset << ", sz " << (long int) state->size << ".");
 
     nleft = state->size - state->offset;
+
+#if DELAY_POOLS
+    ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+    if (clientInfo && !clientInfo->writeLimitingActive)
+        clientInfo = NULL; // we only care about quota limits here
+
+    if (clientInfo) {
+        assert(clientInfo->selectWaiting);
+        clientInfo->selectWaiting = false;
+
+        assert(clientInfo->hasQueue());
+        assert(clientInfo->quotaPeekFd() == fd);
+        clientInfo->quotaDequeue(); // we will write or requeue below
+
+        if (nleft > 0) {
+            const int quota = clientInfo->quotaForDequed();
+            if (!quota) {  // if no write quota left, queue this fd
+                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+                clientInfo->kickQuotaQueue();
+                PROF_stop(commHandleWrite);
+                return;
+            }
+
+            const int nleft_corrected = min(nleft, quota);
+            if (nleft != nleft_corrected) {
+                debugs(5, 5, HERE << "FD " << fd << " writes only " <<
+                    nleft_corrected << " out of " << nleft);
+                nleft = nleft_corrected;
+            }
+
+        }
+    }
+
+#endif
+
+    /* actually WRITE data */
     len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
     debugs(5, 5, "commHandleWrite: write() returns " << len);
+
+#if DELAY_POOLS
+    if (clientInfo) {
+        if (len > 0) {
+            /* we wrote data - drain them from bucket */
+            clientInfo->bucketSize -= len;
+            if (clientInfo->bucketSize < 0.0)
+            {
+                debugs(5,1, HERE << "drained too much"); // should not happen
+                clientInfo->bucketSize = 0;
+            }
+         }
+
+         // even if we wrote nothing, we were served; give others a chance
+         clientInfo->kickQuotaQueue();
+    }
+#endif
+
     fd_bytes(fd, len, FD_WRITE);
     statCounter.syscalls.sock.writes++;
     // After each successful partial write,
     // reset fde::writeStart to the current time.
     fd_table[fd].writeStart = squid_curtime;
 
     if (len == 0) {
         /* Note we even call write if nleft == 0 */
         /* We're done */
 
         if (nleft != 0)
             debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
 
         commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
     } else if (len < 0) {
         /* An error */
 
         if (fd_table[fd].flags.socket_eof) {
             debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
             commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
         } else if (ignoreErrno(errno)) {
             debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commSetSelect(fd,
-                          COMM_SELECT_WRITE,
-                          commHandleWrite,
-                          state,
-                          0);
+            commSelectOrQueueWrite(fd);
         } else {
             debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
             commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
         }
     } else {
         /* A successful write, continue */
         state->offset += len;
 
         if (state->offset < state->size) {
             /* Not done, reinstall the write handler and write some more */
-            commSetSelect(fd,
-                          COMM_SELECT_WRITE,
-                          commHandleWrite,
-                          state,
-                          0);
+            commSelectOrQueueWrite(fd);
         } else {
             commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
         }
     }
 
     PROF_stop(commHandleWrite);
 }
 
 /*
  * Queue a write. handler/handler_data are called when the write
  * completes, on error, or on file descriptor close.
  *
  * free_func is used to free the passed buffer when the write has completed.
  */
 void
 comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
 {
     AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
                                          CommIoCbPtrFun(handler, handler_data));
 
     comm_write(fd, buf, size, call, free_func);
 }
 
 void
 comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
 {
     debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback);
 
     /* Make sure we are open, not closing, and not writing */
     assert(isOpen(fd));
     assert(!fd_table[fd].closing());
     comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
     assert(!ccb->active());
 
     fd_table[fd].writeStart = squid_curtime;
     /* Queue the write */
     commio_set_callback(fd, IOCB_WRITE, ccb, callback,
                         (char *)buf, free_func, size);
+
+    commSelectOrQueueWrite(fd);
+}
+
+// called when fd needs to write but may need to wait in line for its quota
+static void
+commSelectOrQueueWrite(const int fd)
+{
+    comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
+
+#if DELAY_POOLS
+    // stand in line if there is one
+    if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
+        if (clientInfo->writeLimitingActive) {
+            ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+            clientInfo->kickQuotaQueue();
+            return;
+        }
+    }
+#endif
+
     commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
 }
 
 
 /* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
 void
 comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data)
 {
     comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
 }
 
 void
 comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
 {
     comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
 }
 
 
 /*
  * hm, this might be too general-purpose for all the places we'd

=== modified file 'src/fde.h'
--- src/fde.h	2010-10-06 13:03:11 +0000
+++ src/fde.h	2010-10-12 11:30:27 +0000
@@ -16,40 +16,43 @@
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  *
  */
 
 #ifndef SQUID_FDE_H
 #define SQUID_FDE_H
 
 #include "comm.h"
 #include "ip/Address.h"
 
+#if DELAY_POOLS
+class ClientInfo;
+#endif
 class PconnPool;
 
 class fde
 {
 
 public:
     fde() { clear(); };
 
     /// True if comm_close for this fd has been called
     bool closing() { return flags.close_request; }
 
     /* NOTE: memset is used on fdes today. 20030715 RBC */
     static void DumpStats (StoreEntry *);
 
     char const *remoteAddr() const;
     void dumpStats (StoreEntry &, int);
     bool readPending(int);
     void noteUse(PconnPool *);
 
 public:
@@ -72,40 +75,43 @@
         unsigned int socket_eof:1;
         unsigned int nolinger:1;
         unsigned int nonblocking:1;
         unsigned int ipc:1;
         unsigned int called_connect:1;
         unsigned int nodelay:1;
         unsigned int close_on_exec:1;
         unsigned int read_pending:1;
         unsigned int write_pending:1;
         unsigned int transparent:1;
     } flags;
 
     int64_t bytes_read;
     int64_t bytes_written;
 
     struct {
         int uses;                   /* ie # req's over persistent conn */
         PconnPool *pool;
     } pconn;
 
+#if DELAY_POOLS
+    ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */
+#endif
     unsigned epoll_state;
 
     struct _fde_disk disk;
     PF *read_handler;
     void *read_data;
     PF *write_handler;
     void *write_data;
     AsyncCall::Pointer timeoutHandler;
     time_t timeout;
     time_t writeStart;
     void *lifetime_data;
     AsyncCall::Pointer closeHandler;
     AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds
     CommWriteStateData *wstate;         /* State data for comm_write */
     READ_HANDLER *read_method;
     WRITE_HANDLER *write_method;
 #if USE_SSL
     SSL *ssl;
 #endif
 #ifdef _SQUID_MSWIN_
@@ -123,40 +129,41 @@
                                             nfmarkToServer in that this is the value we *receive* from the,
                                             connection, whereas nfmarkToServer is the value to set on packets
                                             *leaving* Squid.   */
 
 private:
     /** Clear the fde class back to NULL equivalent. */
     inline void clear() {
         type = 0;
         remote_port = 0;
         local_addr.SetEmpty();
         tosToServer = '\0';
         nfmarkToServer = 0;
         sock_family = 0;
         memset(ipaddr, '\0', MAX_IPSTRLEN);
         memset(desc,'\0',FD_DESC_SZ);
         memset(&flags,0,sizeof(_fde_flags));
         bytes_read = 0;
         bytes_written = 0;
         pconn.uses = 0;
         pconn.pool = NULL;
+        clientInfo = NULL;
         epoll_state = 0;
         memset(&disk, 0, sizeof(_fde_disk));
         read_handler = NULL;
         read_data = NULL;
         write_handler = NULL;
         write_data = NULL;
         timeoutHandler = NULL;
         timeout = 0;
         writeStart = 0;
         lifetime_data = NULL;
         closeHandler = NULL;
         halfClosedReader = NULL;
         wstate = NULL;
         read_method = NULL;
         write_method = NULL;
 #if USE_SSL
         ssl = NULL;
 #endif
 #ifdef _SQUID_MSWIN_
         win32.handle = NULL;

=== modified file 'src/main.cc'
--- src/main.cc	2010-10-07 13:07:12 +0000
+++ src/main.cc	2010-10-11 15:47:38 +0000
@@ -63,40 +63,44 @@
 #include "ip/tools.h"
 #if USE_EPOLL
 #include "comm_epoll.h"
 #endif
 #if USE_KQUEUE
 #include "comm_kqueue.h"
 #endif
 #if USE_POLL
 #include "comm_poll.h"
 #endif
 #if defined(USE_SELECT) || defined(USE_SELECT_WIN32)
 #include "comm_select.h"
 #endif
 #include "SquidTime.h"
 #include "SwapDir.h"
 #include "forward.h"
 #include "MemPool.h"
 #include "icmp/IcmpSquid.h"
 #include "icmp/net_db.h"
 
+#if DELAY_POOLS
+#include "ClientDelayConfig.h"
+#endif
+
 #if USE_LOADABLE_MODULES
 #include "LoadableModules.h"
 #endif
 
 #if ICAP_CLIENT
 #include "adaptation/icap/Config.h"
 #endif
 #if USE_ECAP
 #include "adaptation/ecap/Config.h"
 #endif
 #if USE_ADAPTATION
 #include "adaptation/Config.h"
 #endif
 
 #if USE_SQUID_ESI
 #include "esi/Module.h"
 #endif
 
 #include "fs/Module.h"
 
@@ -822,40 +826,44 @@
 
     if (IamPrimaryProcess()) {
 #if USE_WCCP
 
         wccpInit();
 #endif
 #if USE_WCCPv2
 
         wccp2Init();
 #endif
     }
 
     serverConnectionsOpen();
 
     neighbors_init();
 
     storeDirOpenSwapLogs();
 
     mimeInit(Config.mimeTablePathname);
 
+#if DELAY_POOLS
+    Config.ClientDelay.finalize();
+#endif
+
     if (Config.onoff.announce) {
         if (!eventFind(start_announce, NULL))
             eventAdd("start_announce", start_announce, NULL, 3600.0, 1);
     } else {
         if (eventFind(start_announce, NULL))
             eventDelete(start_announce, NULL);
     }
 
     writePidFile();		/* write PID file */
 
     debugs(1, 1, "Ready to serve requests.");
 
     reconfiguring = 0;
 }
 
 static void
 mainRotate(void)
 {
     icmpEngine.Close();
 #if USE_DNSSERVERS
@@ -1144,40 +1152,44 @@
     bool enableAdaptation = false;
 
     // We can remove this dependency on specific adaptation mechanisms
     // if we create a generic Registry of such mechanisms. Should we?
 #if ICAP_CLIENT
     Adaptation::Icap::TheConfig.finalize();
     enableAdaptation = Adaptation::Icap::TheConfig.onoff || enableAdaptation;
 #endif
 #if USE_ECAP
     Adaptation::Ecap::TheConfig.finalize(); // must be after we load modules
     enableAdaptation = Adaptation::Ecap::TheConfig.onoff || enableAdaptation;
 #endif
     // must be the last adaptation-related finalize
     Adaptation::Config::Finalize(enableAdaptation);
 #endif
 
 #if USE_SQUID_ESI
     Esi::Init();
 #endif
 
+#if DELAY_POOLS
+    Config.ClientDelay.finalize();
+#endif
+
     debugs(1, 1, "Ready to serve requests.");
 
     if (!configured_once) {
         eventAdd("storeMaintain", Store::Maintain, NULL, 1.0, 1);
 
         if (Config.onoff.announce)
             eventAdd("start_announce", start_announce, NULL, 3600.0, 1);
 
         eventAdd("ipcache_purgelru", ipcache_purgelru, NULL, 10.0, 1);
 
         eventAdd("fqdncache_purgelru", fqdncache_purgelru, NULL, 15.0, 1);
 
 #if USE_XPROF_STATS
 
         eventAdd("cpuProfiling", xprof_event, NULL, 1.0, 1);
 
 #endif
 
         eventAdd("memPoolCleanIdlePools", Mem::CleanIdlePools, NULL, 15.0, 1);
     }

=== modified file 'src/protos.h'
--- src/protos.h	2010-10-06 03:50:45 +0000
+++ src/protos.h	2010-10-11 15:57:56 +0000
@@ -28,40 +28,43 @@
  */
 #ifndef SQUID_PROTOS_H
 #define SQUID_PROTOS_H
 
 /* included for routines that have not moved out to their proper homes
  * yet.
  */
 #include "Packer.h"
 /* for routines still in this file that take CacheManager parameters */
 #include "ip/Address.h"
 /* for parameters that still need these */
 #include "enums.h"
 /* some parameters stil need this */
 #include "wordlist.h"
 
 /* for parameters that still need these */
 #include "lookup_t.h"
 
 
 class HttpRequestMethod;
+#if DELAY_POOLS
+class ClientInfo;
+#endif
 
 
 #if USE_FORW_VIA_DB
 SQUIDCEXTERN void fvdbCountVia(const char *key);
 SQUIDCEXTERN void fvdbCountForw(const char *key);
 #endif
 #if HEADERS_LOG
 SQUIDCEXTERN void headersLog(int cs, int pq, const HttpRequestMethod& m, void *data);
 #endif
 SQUIDCEXTERN char *log_quote(const char *header);
 SQUIDCEXTERN int logTypeIsATcpHit(log_type);
 
 /*
  * cache_cf.c
  */
 SQUIDCEXTERN void configFreeMemory(void);
 class MemBuf;
 SQUIDCEXTERN void wordlistCat(const wordlist *, MemBuf * mb);
 SQUIDCEXTERN void self_destruct(void);
 SQUIDCEXTERN void add_http_port(char *portspec);
@@ -72,40 +75,44 @@
 /* extra functions from cache_cf.c useful for lib modules */
 SQUIDCEXTERN void parse_int(int *var);
 SQUIDCEXTERN void parse_onoff(int *var);
 SQUIDCEXTERN void parse_eol(char *volatile *var);
 SQUIDCEXTERN void parse_wordlist(wordlist ** list);
 SQUIDCEXTERN void requirePathnameExists(const char *name, const char *path);
 SQUIDCEXTERN void parse_time_t(time_t * var);
 
 
 /* client_side.c - FD related client side routines */
 
 SQUIDCEXTERN void clientdbInit(void);
 
 SQUIDCEXTERN void clientdbUpdate(const Ip::Address &, log_type, protocol_t, size_t);
 
 SQUIDCEXTERN int clientdbCutoffDenied(const Ip::Address &);
 void clientdbDump(StoreEntry *);
 SQUIDCEXTERN void clientdbFreeMemory(void);
 
 SQUIDCEXTERN int clientdbEstablished(const Ip::Address &, int);
+#if DELAY_POOLS
+SQUIDCEXTERN void clientdbSetWriteLimiter(ClientInfo * info, const int writeSpeedLimit,const double initialBurst,const double highWatermark);
+SQUIDCEXTERN ClientInfo * clientdbGetInfo(const Ip::Address &addr);
+#endif
 SQUIDCEXTERN void clientOpenListenSockets(void);
 SQUIDCEXTERN void clientHttpConnectionsClose(void);
 SQUIDCEXTERN void httpRequestFree(void *);
 
 extern void clientAccessCheck(void *);
 
 #include "Debug.h"
 
 /* see debug.c for info on context-based debugging */
 SQUIDCEXTERN Ctx ctx_enter(const char *descr);
 SQUIDCEXTERN void ctx_exit(Ctx ctx);
 
 SQUIDCEXTERN void _db_set_syslog(const char *facility);
 SQUIDCEXTERN void _db_init(const char *logfile, const char *options);
 SQUIDCEXTERN void _db_rotate_log(void);
 
 /* packs, then prints an object using debugs() */
 SQUIDCEXTERN void debugObj(int section, int level, const char *label, void *obj, ObjPackMethod pm);
 
 /* disk.c */

=== modified file 'src/structs.h'
--- src/structs.h	2010-10-06 03:50:45 +0000
+++ src/structs.h	2010-10-11 15:47:38 +0000
@@ -109,40 +109,41 @@
 
 struct acl_size_t {
     acl_size_t *next;
     ACLList *aclList;
     int64_t size;
 };
 
 struct ushortlist {
     u_short i;
     ushortlist *next;
 };
 
 struct relist {
     char *pattern;
     regex_t regex;
     relist *next;
 };
 
 #if DELAY_POOLS
 #include "DelayConfig.h"
+#include "ClientDelayConfig.h"
 #endif
 
 #if USE_ICMP
 #include "icmp/IcmpConfig.h"
 #endif
 
 #include "HelperChildConfig.h"
 
 /* forward decl for SquidConfig, see RemovalPolicy.h */
 
 class CpuAffinityMap;
 class RemovalPolicySettings;
 class external_acl;
 class Store;
 
 struct SquidConfig {
 
     struct {
         /* These should be for the Store::Root instance.
         * this needs pluggable parsing to be done smoothly.
@@ -518,40 +519,41 @@
         int use_short_names;
     } icons;
     char *errorDirectory;
 #if USE_ERR_LOCALES
     char *errorDefaultLanguage;
     int errorLogMissingLanguages;
 #endif
     char *errorStylesheet;
 
     struct {
         int maxtries;
         int onerror;
     } retry;
 
     struct {
         int64_t limit;
     } MemPools;
 #if DELAY_POOLS
 
     DelayConfig Delay;
+    ClientDelayConfig ClientDelay;
 #endif
 
     struct {
         int icp_average;
         int dns_average;
         int http_average;
         int icp_min_poll;
         int dns_min_poll;
         int http_min_poll;
     } comm_incoming;
     int max_open_disk_fds;
     int uri_whitespace;
     acl_size_t *rangeOffsetLimit;
 #if MULTICAST_MISS_STREAM
 
     struct {
 
         Ip::Address addr;
         int ttl;
         unsigned short port;

Reply via email to