Author: astitcher
Date: Mon Dec 7 15:42:14 2009
New Revision: 887956
URL: http://svn.apache.org/viewvc?rev=887956&view=rev
Log:
QPID-2214: Opening and closing client connections causes memory use to grow
unboundedly
- Clean up the DeletionManager state for each thread when the thread exits
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h?rev=887956&r1=887955&r2=887956&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DeletionManager.h Mon Dec 7 15:42:14 2009
@@ -54,6 +54,8 @@
template <typename H>
class DeletionManager
{
+ struct ThreadStatus;
+
public:
// Mark every thread as using the handle - it will be deleted
// below after every thread marks the handle as unused
@@ -65,6 +67,28 @@
// handles get deleted here when no one else
// is using them either
static void markAllUnusedInThisThread() {
+ ThreadStatus* threadStatus = getThreadStatus();
+ ScopedLock<Mutex> l(threadStatus->lock);
+
+ // The actual deletions will happen here when all the shared_ptr
+ // ref counts hit 0 (that is when every thread marks the handle unused)
+ threadStatus->handles.clear();
+ }
+
+ static void destroyThreadState() {
+ ThreadStatus* threadStatus = getThreadStatus();
+ {
+ ScopedLock<Mutex> l(threadStatus->lock);
+
+ allThreadsStatuses.delThreadStatus(threadStatus);
+ }
+ delete threadStatus;
+ threadStatus = 0;
+ }
+
+private:
+
+ static ThreadStatus*& getThreadStatus() {
static __thread ThreadStatus* threadStatus = 0;
// Thread local vars can't be dynamically constructed so we need
@@ -75,14 +99,9 @@
allThreadsStatuses.addThreadStatus(threadStatus);
}
- ScopedLock<Mutex> l(threadStatus->lock);
-
- // The actual deletions will happen here when all the shared_ptr
- // ref counts hit 0 (that is when every thread marks the handle unused)
- threadStatus->handles.clear();
+ return threadStatus;
}
-private:
typedef boost::shared_ptr<H> shared_ptr;
// In theory we know that we never need more handles than the number of
@@ -125,6 +144,15 @@
statuses.push_back(t);
}
+ void delThreadStatus(ThreadStatus* t) {
+ ScopedLock<Mutex> l(lock);
+ typename std::vector<ThreadStatus*>::iterator it =
+ std::find(statuses.begin(),statuses.end(), t);
+ if (it != statuses.end()) {
+ statuses.erase(it);
+ }
+ }
+
void addHandle(shared_ptr h) {
ScopedLock<Mutex> l(lock);
std::for_each(statuses.begin(), statuses.end(), handleAdder(h));
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=887956&r1=887955&r2=887956&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Dec 7 15:42:14
2009
@@ -25,6 +25,7 @@
#include "qpid/sys/DeletionManager.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
#include <sys/epoll.h>
#include <errno.h>
@@ -467,28 +468,35 @@
}
void Poller::run() {
- // Make sure we can't be interrupted by signals at a bad time
- ::sigset_t ss;
- ::sigfillset(&ss);
- ::pthread_sigmask(SIG_SETMASK, &ss, 0);
-
- do {
- Event event = wait();
-
- // If can read/write then dispatch appropriate callbacks
- if (event.handle) {
- event.process();
- } else {
- // Handle shutdown
- switch (event.type) {
- case SHUTDOWN:
- return;
- default:
- // This should be impossible
- assert(false);
+ // Ensure that we exit thread responsibly under all circumstances
+ try {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ PollerHandleDeletionManager.destroyThreadState();
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
}
- }
- } while (true);
+ } while (true);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "IO worker thread exiting with unhandled exception: "
<< e.what());
+ }
+ PollerHandleDeletionManager.destroyThreadState();
}
Poller::Event Poller::wait(Duration timeout) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]