Author: tabish
Date: Sat Feb 25 23:38:33 2012
New Revision: 1293724
URL: http://svn.apache.org/viewvc?rev=1293724&view=rev
Log:
Bit more work on Executors.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
Sat Feb 25 23:38:33 2012
@@ -21,6 +21,7 @@ using namespace decaf;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
AbstractExecutorService::AbstractExecutorService() : ExecutorService() {
@@ -29,3 +30,9 @@ AbstractExecutorService::AbstractExecuto
////////////////////////////////////////////////////////////////////////////////
AbstractExecutorService::~AbstractExecutorService() {
}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractExecutorService::doSubmit(FutureType* future) {
+
+ throw UnsupportedOperationException();
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
Sat Feb 25 23:38:33 2012
@@ -40,6 +40,10 @@ namespace concurrent {
AbstractExecutorService();
virtual ~AbstractExecutorService();
+ protected:
+
+ virtual void doSubmit(FutureType* future);
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
Sat Feb 25 23:38:33 2012
@@ -22,6 +22,7 @@
#include <decaf/lang/Runnable.h>
#include <decaf/util/ArrayList.h>
+#include <decaf/util/concurrent/Future.h>
#include <decaf/util/concurrent/Executor.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/lang/exceptions/InterruptedException.h>
@@ -106,6 +107,33 @@ namespace concurrent {
*/
virtual bool isTerminated() const = 0;
+ /**
+ * Submits a Runnable object for execution. A Future object is
created and returned
+ * that will return the default value of the template type upon
completion.
+ *
+ * @param task
+ * Pointer to a Runnable object that will be executed by this
ExecutorService.
+ *
+ * @returns a new Future<?> pointer that is owned by the caller.
+ *
+ * @throws NullPointerException if the Runnable pointer passed is NULL.
+ */
+ template<typename E>
+ Future<E>* submit(decaf::lang::Runnable* task) {
+ return NULL;
+ }
+
+ protected:
+
+ /**
+ * Perform the actual submit of a FutureType instance, the caller is
responsible for
+ * creating the properly typed Future<E> object and returning that to
its caller.
+ *
+ * @param future
+ * Pointer to a base FutureType instance that is to be submitted
to the Executor.
+ */
+ virtual void doSubmit(FutureType* future) = 0;
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
Sat Feb 25 23:38:33 2012
@@ -25,6 +25,7 @@
#include <decaf/util/concurrent/locks/ReentrantLock.h>
#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/RejectedExecutionException.h>
@@ -53,6 +54,30 @@ namespace decaf{
namespace util{
namespace concurrent{
+ using decaf::lang::Pointer;
+
+ /**
+ * Any task that we don't own we wrap in this Runnable object so that the
+ * task deletion logic can remain unchanged and thread safe.
+ */
+ class UnownedTaskWrapper : public Runnable {
+ private:
+
+ Runnable* task;
+
+ public:
+
+ UnownedTaskWrapper(Runnable* task) : Runnable(), task(task) {
+ }
+
+ virtual ~UnownedTaskWrapper() {
+ }
+
+ virtual void run() {
+ this->task->run();
+ }
+ };
+
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
@@ -352,12 +377,19 @@ namespace concurrent{
// Ensure dead Worker Threads are destroyed, the Timer might
not have
// run recently.
- Pointer< Iterator<Worker*> >
iter(this->deadWorkers.iterator());
- while(iter->hasNext()) {
- Worker* worker = iter->next();
+ Pointer< Iterator<Worker*> >
workers(this->deadWorkers.iterator());
+ while(workers->hasNext()) {
+ Worker* worker = workers->next();
worker->thread->join();
delete worker;
}
+
+ Pointer< Iterator<Runnable*> >
tasks(this->workQueue->iterator());
+ while(tasks->hasNext()) {
+ delete tasks->next();
+ }
+
+ this->workQueue->clear();
}
DECAF_CATCH_NOTHROW(Exception)
DECAF_CATCHALL_NOTHROW()
@@ -688,12 +720,22 @@ namespace concurrent{
processWorkerExit(w, completedAbruptly);
}
- void execute(Runnable* task, bool takeOwnership DECAF_UNUSED) {
+ void execute(Runnable* task, bool takeOwnership) {
if (task == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Runnable task
cannot be NULL");
}
+ Runnable* target = task;
+
+ /**
+ * If we don't own it then wrap it so that our deletion logic is
+ * still valid.
+ */
+ if (!takeOwnership) {
+ target = new UnownedTaskWrapper(task);
+ }
+
/*
* Proceed in 3 steps:
*
@@ -716,21 +758,21 @@ namespace concurrent{
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
- if (addWorker(task, true)) {
+ if (addWorker(target, true)) {
return;
}
c = ctl.get();
}
- if (isRunning(c) && workQueue->offer(task)) {
+ if (isRunning(c) && workQueue->offer(target)) {
int recheck = ctl.get();
- if (!isRunning(recheck) && this->remove(task)) {
- this->rejectionHandler->rejectedExecution(task,
this->parent);
+ if (!isRunning(recheck) && this->remove(target)) {
+ this->rejectionHandler->rejectedExecution(target,
this->parent);
} else if (workerCountOf(recheck) == 0) {
addWorker(NULL, false);
}
- } else if (!addWorker(task, false)) {
- this->rejectionHandler->rejectedExecution(task, this->parent);
+ } else if (!addWorker(target, false)) {
+ this->rejectionHandler->rejectedExecution(target,
this->parent);
}
}
@@ -935,9 +977,9 @@ namespace concurrent{
}
bool remove(Runnable* task) {
- bool removed = this->workQueue->remove(task);
+ bool result = this->workQueue->remove(task);
this->tryTerminate();
- return removed;
+ return result;
}
private:
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
Sat Feb 25 23:38:33 2012
@@ -41,24 +41,25 @@ LockSupport::~LockSupport() {
////////////////////////////////////////////////////////////////////////////////
void LockSupport::unpark( decaf::lang::Thread* thread ) throw() {
- try{
- Threading::unpark( thread );
- } DECAF_CATCHALL_NOTHROW()
+ try {
+ Threading::unpark(thread);
+ }
+ DECAF_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void LockSupport::park() throw() {
- try{
- Threading::park( Thread::currentThread() );
- } DECAF_CATCHALL_NOTHROW()
+ try {
+ Threading::park(Thread::currentThread());
+ }
+ DECAF_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void LockSupport::parkNanos( long long nanos ) throw() {
- try{
-
+ try {
long long mills = 0;
if( nanos >= 1000000 ) {
@@ -67,8 +68,8 @@ void LockSupport::parkNanos( long long n
}
Threading::park(Thread::currentThread(), mills, (int)nanos);
-
- } DECAF_CATCHALL_NOTHROW()
+ }
+ DECAF_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
@@ -83,6 +84,6 @@ void LockSupport::parkUntil( long long d
}
Threading::park(Thread::currentThread(), ( deadline - now ), 0);
-
- } DECAF_CATCHALL_NOTHROW()
+ }
+ DECAF_CATCHALL_NOTHROW()
}