Author: tabish
Date: Fri Aug 3 20:33:18 2012
New Revision: 1369198
URL: http://svn.apache.org/viewvc?rev=1369198&view=rev
Log:
Fix a race in the destructor of ThreadPoolExecutor due to the count of active
Worker threads being decremented to early, lead to segfaults when the
destructor was allowed to finished while the workers were still shutting down.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1369198&r1=1369197&r2=1369198&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
Fri Aug 3 20:33:18 2012
@@ -499,18 +499,20 @@ namespace concurrent{
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
+ *
+ * @returns true if the termination succeeded.
*/
- void tryTerminate() {
+ bool tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue->isEmpty())) {
- return;
+ return false;
}
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
- return;
+ return false;
}
mainLock.lock();
@@ -528,7 +530,7 @@ namespace concurrent{
ctl.set(ctlOf(TERMINATED, 0));
termination->signalAll();
mainLock.unlock();
- return;
+ return true;
}
} catch(Exception& ex) {
mainLock.unlock();
@@ -537,6 +539,8 @@ namespace concurrent{
mainLock.unlock();
// else retry on failed CAS
}
+
+ return false;
}
/**
@@ -1133,6 +1137,7 @@ namespace concurrent{
mainLock.unlock();
throw;
}
+
mainLock.unlock();
t->start();
@@ -1151,7 +1156,7 @@ namespace concurrent{
/**
* Performs cleanup and bookkeeping for a dying worker. Called only
from
* worker threads. Unless completedAbruptly is set, assumes that
workerCount
- * has already been adjusted to account for exit. This method removes
+ * has not already been adjusted to account for exit. This method
removes
* thread from worker set, and possibly terminates the pool or
replaces the
* worker if either it exited due to user task exception or if fewer
than
* corePoolSize workers are running or queue is non-empty but there
are no
@@ -1162,11 +1167,7 @@ namespace concurrent{
* @param completedAbruptly
* Indicates if the worker died due to user exception.
*/
- void processWorkerExit(Worker* w, bool completedAbruptly) {
-
- if (completedAbruptly) { // If abrupt, then workerCount wasn't
adjusted
- decrementWorkerCount();
- }
+ void processWorkerExit(Worker* w, bool completedAbruptly DECAF_UNUSED)
{
mainLock.lock();
try {
@@ -1175,9 +1176,12 @@ namespace concurrent{
this->deadWorkers.add(w);
} catch(...) {
}
+ decrementWorkerCount();
mainLock.unlock();
- tryTerminate();
+ if (tryTerminate()) {
+ return;
+ }
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
@@ -1208,7 +1212,7 @@ namespace concurrent{
* both before and after the timed wait.
*
* @return task, or NULL if the worker must exit, in which case
- * workerCount is decremented
+ * workerCount is decremented when the task completes.
*/
Runnable* getTask() {
bool timedOut = false; // Did the last poll() time out?
@@ -1220,7 +1224,6 @@ namespace concurrent{
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue->isEmpty())) {
- decrementWorkerCount();
return NULL;
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp?rev=1369198&r1=1369197&r2=1369198&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp
Fri Aug 3 20:33:18 2012
@@ -25,6 +25,7 @@
#include <map>
#include <string>
+#include <vector>
using namespace std;
using namespace decaf;
@@ -34,20 +35,30 @@ using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
class TestClassBase {
+protected:
+
+ std::vector<int> content;
+
public:
virtual ~TestClassBase(){}
virtual std::string returnHello() = 0;
+ int getSize() const {
+ return content.size();
+ }
};
////////////////////////////////////////////////////////////////////////////////
class TestClassA : public TestClassBase {
public:
+ TestClassA() : TestClassBase() {
+ this->content.resize(1);
+ }
+
virtual ~TestClassA() {
- //std::cout << std::endl << "TestClassA - Destructor" << std::endl;
}
std::string returnHello() {
@@ -60,8 +71,11 @@ public:
class TestClassB : public TestClassBase {
public:
+ TestClassB() : TestClassBase() {
+ this->content.resize(2);
+ }
+
virtual ~TestClassB() {
- //std::cout << std::endl << "TestClassB - Destructor" << std::endl;
}
std::string returnHello() {
@@ -371,51 +385,6 @@ void PointerTest::testSTLContainers() {
CPPUNIT_ASSERT( *( testMap2.rbegin()->first ) == 3 );
}
-//////////////////////////////////////////////////////////////////////////////////
-//class SelfCounting {
-//private:
-//
-// int refCount;
-//
-//public:
-//
-// SelfCounting() : refCount( 0 ) {}
-// SelfCounting( const SelfCounting& other ) : refCount( other.refCount ) {}
-//
-// void addReference() { this->refCount++; }
-// bool releaseReference() { return !( --this->refCount ); }
-//
-// std::string returnHello() { return "Hello"; }
-//};
-//
-//////////////////////////////////////////////////////////////////////////////////
-//void PointerTest::testInvasive() {
-//
-// Pointer< SelfCounting, InvasiveCounter<SelfCounting> > thePointer( new
SelfCounting );
-//
-// // Test Null Initialize
-// Pointer< SelfCounting, InvasiveCounter<SelfCounting> > nullPointer;
-// CPPUNIT_ASSERT( nullPointer.get() == NULL );
-//
-// // Test Value Constructor
-// Pointer< SelfCounting, InvasiveCounter<SelfCounting> > pointer(
thePointer );
-// CPPUNIT_ASSERT( pointer.get() == thePointer );
-//
-// // Test Copy Constructor
-// Pointer< SelfCounting, InvasiveCounter<SelfCounting> > ctorCopy( pointer
);
-// CPPUNIT_ASSERT( ctorCopy.get() == thePointer );
-//
-// // Test Assignment
-// Pointer< SelfCounting, InvasiveCounter<SelfCounting> > copy = pointer;
-// CPPUNIT_ASSERT( copy.get() == thePointer );
-//
-// CPPUNIT_ASSERT( ( *pointer ).returnHello() == "Hello" );
-// CPPUNIT_ASSERT( pointer->returnHello() == "Hello" );
-//
-// copy.reset( NULL );
-// CPPUNIT_ASSERT( copy.get() == NULL );
-//}
-
////////////////////////////////////////////////////////////////////////////////
TestClassBase* methodReturnRawPointer() {
@@ -444,11 +413,13 @@ void PointerTest::testDynamicCast() {
CPPUNIT_ASSERT_NO_THROW(
ptrTestClassA = pointer1.dynamicCast<TestClassA>() );
CPPUNIT_ASSERT( ptrTestClassA != NULL );
+ CPPUNIT_ASSERT( ptrTestClassA->getSize() == 1 );
Pointer<TestClassB> ptrTestClassB;
CPPUNIT_ASSERT_NO_THROW(
ptrTestClassB = pointer2.dynamicCast<TestClassB>() );
CPPUNIT_ASSERT( ptrTestClassB != NULL );
+ CPPUNIT_ASSERT( ptrTestClassB->getSize() == 2 );
Pointer<TestClassA> ptrTestClassA2;
CPPUNIT_ASSERT_THROW_MESSAGE(
@@ -461,14 +432,20 @@ void PointerTest::testDynamicCast() {
"Should Throw a ClassCastException",
ptrTestClassA2 = nullPointer.dynamicCast<TestClassA>(),
ClassCastException );
+
+ Pointer<TestClassBase> basePointer =
ptrTestClassA.dynamicCast<TestClassBase>();
+ CPPUNIT_ASSERT( basePointer->getSize() == 1 );
+
+ basePointer = ptrTestClassB.dynamicCast<TestClassBase>();
+ CPPUNIT_ASSERT( basePointer->getSize() == 2 );
}
////////////////////////////////////////////////////////////////////////////////
class Gate {
private:
- CountDownLatch * enter_latch;
- CountDownLatch * leave_latch;
+ CountDownLatch * enterLatch;
+ CountDownLatch * leaveLatch;
Mutex mutex;
bool closed;
@@ -478,8 +455,8 @@ public:
virtual ~Gate() {}
void open( int count ) {
- leave_latch = new CountDownLatch( count );
- enter_latch = new CountDownLatch( count );
+ leaveLatch = new CountDownLatch( count );
+ enterLatch = new CountDownLatch( count );
mutex.lock();
closed = false;
mutex.notifyAll();
@@ -490,21 +467,21 @@ public:
mutex.lock();
while( closed )
mutex.wait();
- enter_latch->countDown();
- if (enter_latch->getCount() == 0) {
+ enterLatch->countDown();
+ if (enterLatch->getCount() == 0) {
closed = true;
}
mutex.unlock();
}
void leave() {
- leave_latch->countDown();
+ leaveLatch->countDown();
}
void close() {
- leave_latch->await();
- delete leave_latch;
- delete enter_latch;
+ leaveLatch->await();
+ delete leaveLatch;
+ delete enterLatch;
}
};
@@ -512,23 +489,23 @@ public:
class PointerTestThread: public Thread {
private:
- Gate *_gate;
- Pointer<std::string> _s;
+ Gate *gate;
+ Pointer<std::string> s;
public:
- PointerTestThread( Gate *gate ) : _gate( gate ) {}
+ PointerTestThread( Gate *gate ) : gate( gate ) {}
virtual ~PointerTestThread() {}
void setString( Pointer<std::string> s ) {
- _s = s;
+ this->s = s;
}
virtual void run() {
for( int j = 0; j < 1000; j++ ) {
- _gate->enter();
- _s.reset( NULL );
- _gate->leave();
+ gate->enter();
+ s.reset( NULL );
+ gate->leave();
}
}
};