This is a patch for the bug 2619. Please see in squid bugzilla for
problem description and discussions:
http://bugs.squid-cache.org/show_bug.cgi?id=2619
This patch is similar to the original patch developed my Martin Huter
plus some code to solve the same problem in reqmod ICAP requests and
some other minor fixes.
author: Martin Huter <[email protected]>, Alex Rousskov <[email protected]>, Christos Tsantilas <[email protected]>
Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption
If the client does not read from the open connection (i.e. the user does not
confirm the browsers download-message-box in microsofts IE), squid keeps on
reading data from the ICAP server into the store entry, while no more data
can be delivered to the client.
Thus the store entry in memory is growing and squid may - in worst case -
consume memory up to the size of the users download.
This patch add API to StoreEntry to call the producer back when released
memory/space from the StoreEntry and add code to the ICAP client code to not
consume body data comes from the ICAP server when there is not available space
in the store entry.
=== modified file 'src/Server.cc'
--- src/Server.cc 2011-10-23 00:16:42 +0000
+++ src/Server.cc 2011-11-11 17:03:03 +0000
@@ -33,54 +33,57 @@
*/
#include "squid.h"
#include "acl/Gadgets.h"
#include "base/TextException.h"
#include "comm/Connection.h"
#include "comm/forward.h"
#include "comm/Write.h"
#include "Server.h"
#include "Store.h"
#include "HttpRequest.h"
#include "HttpReply.h"
#include "errorpage.h"
#include "err_detail_type.h"
#include "SquidTime.h"
#if USE_ADAPTATION
#include "adaptation/AccessCheck.h"
#include "adaptation/Answer.h"
#include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
#endif
// implemented in client_side_reply.cc until sides have a common parent
extern void purgeEntriesByUrl(HttpRequest * req, const char *url);
ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
requestSender(NULL),
#if USE_ADAPTATION
adaptedHeadSource(NULL),
adaptationAccessCheckPending(false),
startedAdaptation(false),
#endif
- receivedWholeRequestBody(false)
+ receivedWholeRequestBody(false),
+ theVirginReply(NULL),
+ theFinalReply(NULL)
{
fwd = theFwdState;
entry = fwd->entry;
entry->lock();
request = HTTPMSGLOCK(fwd->request);
}
ServerStateData::~ServerStateData()
{
// paranoid: check that swanSong has been called
assert(!requestBodySource);
#if USE_ADAPTATION
assert(!virginBodyDestination);
assert(!adaptedBodySource);
#endif
entry->unlock();
@@ -256,67 +259,70 @@
ServerStateData::abortOnBadEntry(const char *abortReason)
{
if (entry->isAccepting())
return false;
debugs(11,5, HERE << "entry is not Accepting!");
abortTransaction(abortReason);
return true;
}
// more request or adapted response body is available
void
ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
{
#if USE_ADAPTATION
if (adaptedBodySource == bp) {
handleMoreAdaptedBodyAvailable();
return;
}
#endif
- handleMoreRequestBodyAvailable();
+ if (requestBodySource == bp)
+ handleMoreRequestBodyAvailable();
}
// the entire request or adapted response body was provided, successfully
void
ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
{
#if USE_ADAPTATION
if (adaptedBodySource == bp) {
handleAdaptedBodyProductionEnded();
return;
}
#endif
- handleRequestBodyProductionEnded();
+ if (requestBodySource == bp)
+ handleRequestBodyProductionEnded();
}
// premature end of the request or adapted response body production
void
ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
{
#if USE_ADAPTATION
if (adaptedBodySource == bp) {
handleAdaptedBodyProducerAborted();
return;
}
#endif
- handleRequestBodyProducerAborted();
+ if (requestBodySource == bp)
+ handleRequestBodyProducerAborted();
}
// more origin request body data is available
void
ServerStateData::handleMoreRequestBodyAvailable()
{
if (!requestSender)
sendMoreRequestBody();
else
debugs(9,3, HERE << "waiting for request body write to complete");
}
// there will be no more handleMoreRequestBodyAvailable calls
void
ServerStateData::handleRequestBodyProductionEnded()
{
receivedWholeRequestBody = true;
if (!requestSender)
doneSendingRequestBody();
@@ -681,70 +687,125 @@
}
HttpReply *rep = dynamic_cast<HttpReply*>(msg);
assert(rep);
debugs(11,5, HERE << this << " setting adapted reply to " << rep);
setFinalReply(rep);
assert(!adaptedBodySource);
if (rep->body_pipe != NULL) {
// subscribe to receive adapted body
adaptedBodySource = rep->body_pipe;
// assume that ICAP does not auto-consume on failures
assert(adaptedBodySource->setConsumerIfNotLate(this));
} else {
// no body
if (doneWithAdaptation()) // we may still be sending virgin response
handleAdaptationCompleted();
}
}
-// more adapted response body is available
void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
{
- const size_t contentSize = adaptedBodySource->buf().contentSize();
+ if (abortOnBadEntry("store entry aborted while kick producer callback"))
+ return;
- debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
- "response body at offset " << adaptedBodySource->consumedSize());
+ if(!adaptedBodySource)
+ return;
+
+ handleMoreAdaptedBodyAvailable();
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+}
+
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
if (abortOnBadEntry("entry refuses adapted body"))
return;
assert(entry);
+
+ size_t contentSize = adaptedBodySource->buf().contentSize();
+ bool consumedPartially = false;
+
+ if (!contentSize)
+ return; // XXX: bytesWanted asserts on zero-size ranges
+
+ // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+ // We have to add 1 to avoid suspending forever.
+ const size_t bytesWanted = entry->bytesWanted(Range<size_t>(0, contentSize));
+ const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ServerStateData> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+ Dialer(this, &ServerStateData::resumeBodyStorage));
+ entry->deferProducer(call);
+ }
+
+ // XXX: bytesWanted API does not allow us to write just one byte!
+ if (!spaceAvailable && contentSize > 1) {
+ debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+ return;
+ }
+
+ if (spaceAvailable < contentSize ) {
+ debugs(11, 5, HERE << "postponing storage of " <<
+ (contentSize - spaceAvailable) << " body bytes");
+ contentSize = spaceAvailable;
+ consumedPartially=true;
+ }
+
+ debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
- currentOffset += bpc.buf.size;
+ const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+ currentOffset += ioBuf.length;
entry->write(ioBuf);
bpc.buf.consume(contentSize);
bpc.checkIn();
}
// the entire adapted response body was produced, successfully
void
ServerStateData::handleAdaptedBodyProductionEnded()
{
- stopConsumingFrom(adaptedBodySource);
-
if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
return;
+
+ // end consumption if we consumed everything
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+ // else resumeBodyStorage() will eventually consume the rest
+}
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+ stopConsumingFrom(adaptedBodySource);
handleAdaptationCompleted();
}
// premature end of the adapted response body
void ServerStateData::handleAdaptedBodyProducerAborted()
{
stopConsumingFrom(adaptedBodySource);
handleAdaptationAborted();
}
// common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
void
ServerStateData::handleAdaptationCompleted()
{
debugs(11,5, HERE << "handleAdaptationCompleted");
cleanAdaptation();
// We stop reading origin response because we have no place to put it and
// cannot use it. If some origin servers do not like that or if we want to
// reuse more pconns, we can add code to discard unneeded origin responses.
=== modified file 'src/Server.h'
--- src/Server.h 2011-10-21 16:20:42 +0000
+++ src/Server.h 2011-11-11 13:42:13 +0000
@@ -130,40 +130,45 @@
virtual bool doneWithServer() const = 0; /**< did we end communication? */
/// Entry-dependent callbacks use this check to quit if the entry went bad
bool abortOnBadEntry(const char *abortReason);
#if USE_ADAPTATION
void startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause);
void adaptVirginReplyBody(const char *buf, ssize_t len);
void cleanAdaptation();
virtual bool doneWithAdaptation() const; /**< did we end ICAP communication? */
// BodyConsumer for ICAP: consume adapted response body.
void handleMoreAdaptedBodyAvailable();
void handleAdaptedBodyProductionEnded();
void handleAdaptedBodyProducerAborted();
void handleAdaptedHeader(HttpMsg *msg);
void handleAdaptationCompleted();
void handleAdaptationBlocked(const Adaptation::Answer &answer);
void handleAdaptationAborted(bool bypassable = false);
+
+ /// called by StoreEntry when it has more buffer space available
+ void resumeBodyStorage();
+ /// called when the entire adapted response body is consumed
+ void endAdaptedBodyConsumption();
#endif
protected:
const HttpReply *virginReply() const;
HttpReply *virginReply();
HttpReply *setVirginReply(HttpReply *r);
HttpReply *finalReply();
HttpReply *setFinalReply(HttpReply *r);
// Kids use these to stuff data into the response instead of messing with the entry directly
void adaptOrFinalizeReply();
void addVirginReplyBody(const char *buf, ssize_t len);
void storeReplyBody(const char *buf, ssize_t len);
size_t replyBodySpace(const MemBuf &readBuf, const size_t minSpace) const;
void adjustBodyBytesRead(const int64_t delta);
// These should be private
int64_t currentOffset; /**< Our current offset in the StoreEntry */
=== modified file 'src/Store.h'
--- src/Store.h 2011-10-14 16:21:48 +0000
+++ src/Store.h 2011-11-11 14:08:21 +0000
@@ -184,43 +184,55 @@
void setReleaseFlag();
#if USE_SQUID_ESI
ESIElement::Pointer cachedESITree;
#endif
/** append bytes to the buffer */
virtual void append(char const *, int len);
/** disable sending content to the clients */
virtual void buffer();
/** flush any buffered content */
virtual void flush();
/** reduce the memory lock count on the entry */
virtual int unlock();
/** increate the memory lock count on the entry */
virtual int64_t objectLen() const;
virtual int64_t contentLen() const;
virtual void lock();
virtual void release();
+#if USE_ADAPTATION
+ /// call back producer when more buffer space is available
+ void deferProducer(const AsyncCall::Pointer &producer);
+ /// calls back producer registered with deferProducer
+ void kickProducer();
+#endif
+
private:
static MemAllocator *pool;
+#if USE_ADAPTATION
+ /// producer callback registered with deferProducer
+ AsyncCall::Pointer deferredProducer;
+#endif
+
bool validLength() const;
bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const;
};
std::ostream &operator <<(std::ostream &os, const StoreEntry &e);
/// \ingroup StoreAPI
class NullStoreEntry:public StoreEntry
{
public:
static NullStoreEntry *getInstance();
bool isNull() {
return true;
}
const char *getMD5Text() const;
_SQUID_INLINE_ HttpReply const *getReply() const;
void write (StoreIOBuffer) {}
=== modified file 'src/StoreIOBuffer.h'
--- src/StoreIOBuffer.h 2010-10-29 00:12:28 +0000
+++ src/StoreIOBuffer.h 2011-11-07 11:05:28 +0000
@@ -42,40 +42,47 @@
class StoreIOBuffer
{
public:
StoreIOBuffer():length(0), offset (0), data (NULL) {flags.error = 0;}
StoreIOBuffer(size_t aLength, int64_t anOffset, char *someData) :
length (aLength), offset (anOffset), data (someData) {
flags.error = 0;
}
/* Create a StoreIOBuffer from a MemBuf and offset */
/* NOTE that MemBuf still "owns" the pointers, StoreIOBuffer is just borrowing them */
StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset) :
length(aMemBuf->contentSize()),
offset (anOffset),
data(aMemBuf->content()) {
flags.error = 0;
}
+ StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) :
+ length(anLength),
+ offset (anOffset),
+ data(aMemBuf->content()) {
+ flags.error = 0;
+ }
+
Range<int64_t> range() const {
return Range<int64_t>(offset, offset + length);
}
void dump() const {
if (fwrite(data, length, 1, stderr)) {}
if (fwrite("\n", 1, 1, stderr)) {}
}
struct {
unsigned error:1;
} flags;
size_t length;
int64_t offset;
char *data;
};
inline
std::ostream &
operator <<(std::ostream &os, const StoreIOBuffer &b)
=== modified file 'src/client_side_request.cc'
--- src/client_side_request.cc 2011-10-21 16:20:42 +0000
+++ src/client_side_request.cc 2011-11-11 17:03:21 +0000
@@ -1664,71 +1664,96 @@
}
// we are done with getting headers (but may be receiving body)
clearAdaptation(virginHeadSource);
if (!request_satisfaction_mode)
doCallouts();
}
void
ClientHttpRequest::handleAdaptationBlock(const Adaptation::Answer &answer)
{
request->detailError(ERR_ACCESS_DENIED, ERR_DETAIL_REQMOD_BLOCK);
AclMatchedName = answer.ruleId.termedBuf();
assert(calloutContext);
calloutContext->clientAccessCheckDone(ACCESS_DENIED);
AclMatchedName = NULL;
}
void
+ClientHttpRequest::resumeBodyStorage()
+{
+ if(!adaptedBodySource)
+ return;
+
+ noteMoreBodyDataAvailable(adaptedBodySource);
+}
+
+void
ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
{
assert(request_satisfaction_mode);
assert(adaptedBodySource != NULL);
- if (const size_t contentSize = adaptedBodySource->buf().contentSize()) {
+ if (size_t contentSize = adaptedBodySource->buf().contentSize()) {
+ // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+ // We have to add 1 to avoid suspending forever.
+ const size_t bytesWanted = storeEntry()->bytesWanted(Range<size_t>(0,contentSize));
+ const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ClientHttpRequest> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage",
+ Dialer(this, &ClientHttpRequest::resumeBodyStorage));
+ storeEntry()->deferProducer(call);
+ }
+
+ // XXX: bytesWanted API does not allow us to write just one byte!
+ if (!spaceAvailable && contentSize > 1)
+ return;
+
+ if (spaceAvailable < contentSize )
+ contentSize = spaceAvailable;
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset);
+ const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize);
storeEntry()->write(ioBuf);
- // assume can write everything
- request_satisfaction_offset += contentSize;
+ // assume StoreEntry::write() writes the entire ioBuf
+ request_satisfaction_offset += ioBuf.length;
bpc.buf.consume(contentSize);
bpc.checkIn();
}
if (adaptedBodySource->exhausted())
endRequestSatisfaction();
// else wait for more body data
}
void
ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
{
assert(!virginHeadSource);
- if (adaptedBodySource != NULL) { // did not end request satisfaction yet
- // We do not expect more because noteMoreBodyDataAvailable always
- // consumes everything. We do not even have a mechanism to consume
- // leftovers after noteMoreBodyDataAvailable notifications seize.
- assert(adaptedBodySource->exhausted());
+ // should we end request satisfaction now?
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
endRequestSatisfaction();
- }
}
void
ClientHttpRequest::endRequestSatisfaction()
{
debugs(85,4, HERE << this << " ends request satisfaction");
assert(request_satisfaction_mode);
stopConsumingFrom(adaptedBodySource);
// TODO: anything else needed to end store entry formation correctly?
storeEntry()->complete();
}
void
ClientHttpRequest::noteBodyProducerAborted(BodyPipe::Pointer)
{
assert(!virginHeadSource);
stopConsumingFrom(adaptedBodySource);
debugs(85,3, HERE << "REQMOD body production failed");
=== modified file 'src/client_side_request.h'
--- src/client_side_request.h 2011-10-21 16:20:42 +0000
+++ src/client_side_request.h 2011-11-10 15:03:23 +0000
@@ -171,40 +171,42 @@
public:
void startAdaptation(const Adaptation::ServiceGroupPointer &g);
// private but exposed for ClientRequestContext
void handleAdaptationFailure(int errDetail, bool bypassable = false);
private:
// Adaptation::Initiator API
virtual void noteAdaptationAnswer(const Adaptation::Answer &answer);
void handleAdaptedHeader(HttpMsg *msg);
void handleAdaptationBlock(const Adaptation::Answer &answer);
virtual void noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group);
// BodyConsumer API, called by BodyPipe
virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer);
virtual void noteBodyProductionEnded(BodyPipe::Pointer);
virtual void noteBodyProducerAborted(BodyPipe::Pointer);
void endRequestSatisfaction();
+ /// called by StoreEntry when it has more buffer space available
+ void resumeBodyStorage();
private:
CbcPointer<Adaptation::Initiate> virginHeadSource;
BodyPipe::Pointer adaptedBodySource;
bool request_satisfaction_mode;
int64_t request_satisfaction_offset;
#endif
};
/* client http based routines */
SQUIDCEXTERN char *clientConstructTraceEcho(ClientHttpRequest *);
class ACLFilledChecklist;
SQUIDCEXTERN ACLFilledChecklist *clientAclChecklistCreate(const acl_access * acl,ClientHttpRequest * http);
SQUIDCEXTERN int clientHttpRequestStatus(int fd, ClientHttpRequest const *http);
SQUIDCEXTERN void clientAccessCheck(ClientHttpRequest *);
/* ones that should be elsewhere */
SQUIDCEXTERN void redirectStart(ClientHttpRequest *, RH *, void *);
=== modified file 'src/store.cc'
--- src/store.cc 2011-10-27 20:39:23 +0000
+++ src/store.cc 2011-11-11 15:13:54 +0000
@@ -393,40 +393,61 @@
{
debugs(20, 3, HERE << "new StoreEntry " << this);
mem_obj = new MemObject(aUrl, aLogUrl);
expires = lastmod = lastref = timestamp = -1;
swap_status = SWAPOUT_NONE;
swap_filen = -1;
swap_dirn = -1;
}
StoreEntry::~StoreEntry()
{
if (swap_filen >= 0) {
SwapDir &sd = dynamic_cast<SwapDir&>(*store());
sd.disconnect(*this);
}
delete hidden_mem_obj;
}
+#if USE_ADAPTATION
+void
+StoreEntry::deferProducer(const AsyncCall::Pointer &producer)
+{
+ if (!deferredProducer)
+ deferredProducer = producer;
+ else
+ debugs(20, 5, HERE << "Deferred producer call is allready set to: " <<
+ *deferredProducer << ", requested call: " << *producer);
+}
+
+void
+StoreEntry::kickProducer()
+{
+ if(deferredProducer != NULL){
+ ScheduleCallHere(deferredProducer);
+ deferredProducer = NULL;
+ }
+}
+#endif
+
void
StoreEntry::destroyMemObject()
{
debugs(20, 3, HERE << "destroyMemObject " << mem_obj);
setMemStatus(NOT_IN_MEMORY);
MemObject *mem = mem_obj;
mem_obj = NULL;
delete mem;
delete hidden_mem_obj;
hidden_mem_obj = NULL;
}
void
StoreEntry::hideMemObject()
{
debugs(20, 3, HERE << "hiding " << mem_obj);
assert(mem_obj);
assert(!hidden_mem_obj);
hidden_mem_obj = mem_obj;
mem_obj = NULL;
=== modified file 'src/store_client.cc'
--- src/store_client.cc 2011-02-15 04:02:28 +0000
+++ src/store_client.cc 2011-11-10 16:47:48 +0000
@@ -245,40 +245,45 @@
#endif
/* range requests will skip into the body */
cmp_offset = copyRequest.offset;
_callback = Callback (callback_fn, cbdataReference(data));
copyInto.data = copyRequest.data;
copyInto.length = copyRequest.length;
copyInto.offset = copyRequest.offset;
static bool copying (false);
assert (!copying);
copying = true;
PROF_start(storeClient_kickReads);
/* we might be blocking comm reads due to readahead limits
* now we have a new offset, trigger those reads...
*/
entry->mem_obj->kickReads();
PROF_stop(storeClient_kickReads);
copying = false;
storeClientCopy2(entry, this);
+
+#if USE_ADAPTATION
+ if (entry)
+ entry->kickProducer();
+#endif
}
/*
* This function is used below to decide if we have any more data to
* send to the client. If the store_status is STORE_PENDING, then we
* do have more data to send. If its STORE_OK, then
* we continue checking. If the object length is negative, then we
* don't know the real length and must open the swap file to find out.
* If the length is >= 0, then we compare it to the requested copy
* offset.
*/
static int
storeClientNoMoreToSend(StoreEntry * e, store_client * sc)
{
int64_t len;
if (e->store_status == STORE_PENDING)
return 0;
if ((len = e->objectLen()) < 0)
@@ -709,40 +714,44 @@
if (sc->_callback.pending()) {
/* callback with ssize = -1 to indicate unexpected termination */
debugs(90, 3, "storeUnregister: store_client for " << mem->url << " has a callback");
sc->fail();
}
#if STORE_CLIENT_LIST_DEBUG
cbdataReferenceDone(sc->owner);
#endif
delete sc;
assert(e->lock_count > 0);
if (mem->nclients == 0)
CheckQuickAbort(e);
else
mem->kickReads();
+#if USE_ADAPTATION
+ e->kickProducer();
+#endif
+
return 1;
}
/* Call handlers waiting for data to be appended to E. */
void
StoreEntry::invokeHandlers()
{
/* Commit what we can to disk, if appropriate */
swapOut();
int i = 0;
store_client *sc;
dlink_node *nx = NULL;
dlink_node *node;
PROF_start(InvokeHandlers);
debugs(90, 3, "InvokeHandlers: " << getMD5Text() );
/* walk the entire list looking for valid callbacks */
for (node = mem_obj->clients.head; node; node = nx) {