Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Mon Apr 11 20:22:39 2011 @@ -18,10 +18,15 @@ #define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_ #include <decaf/lang/Runnable.h> +#include <decaf/lang/Throwable.h> #include <decaf/util/concurrent/ThreadFactory.h> #include <decaf/util/concurrent/BlockingQueue.h> #include <decaf/util/concurrent/TimeUnit.h> +#include <decaf/util/concurrent/AbstractExecutorService.h> +#include <decaf/util/concurrent/RejectedExecutionHandler.h> +#include <decaf/util/concurrent/RejectedExecutionException.h> #include <decaf/util/LinkedList.h> +#include <decaf/util/ArrayList.h> #include <decaf/util/Config.h> #include <vector> @@ -50,7 +55,7 @@ namespace concurrent{ * object that implements the <code>Runnable</code> interface and * one of the worker threads will executing it in its thread context. */ - class DECAF_API ThreadPoolExecutor { + class DECAF_API ThreadPoolExecutor : public AbstractExecutorService { private: ThreadPoolExecutor( const ThreadPoolExecutor& ); @@ -93,48 +98,130 @@ namespace concurrent{ long long keepAliveTime, const TimeUnit& unit, BlockingQueue<decaf::lang::Runnable*>* workQueue); - virtual ~ThreadPoolExecutor(); - /** - * Queue a task to be completed by one of the Pooled Threads at some point in the - * future. The task can be rejected by this executor if it has been shut down or - * if the workQueue is full, rejected Runnables are not deleted by this executor. - * Upon successful return from this method the given Runnable pointer is considered - * to be owned by this Executor and will be deleted upon completion or shut down. + * Creates a new instance of a ThreadPoolExecutor. * - * @param task - * The Runnable object that is to be executed. + * The executor instance is configured with the passed in parameters and a + * default thread Factory is used along with a default rejected execution + * handler. + * + * @param corePoolSize + * The number of threads to pool regardless of their idle state. + * @param maxPoolSize + * The maximum number of threads that will ever exist at one time in the pool. + * @param keepAliveTime + * The maximum time to keep a thread in the pool for if the number of current + * threads exceeds to core pool size. + * @param unit + * The units that the keepAliveTime is specified in. + * @param workQueue + * A BlockingQueue implementation that will be used to hold Runnable tasks + * that are awaiting execution within this executor. The Executor takes + * ownership of the BlockingQueue instance passed once this method returns. + * @param handler + * A RejectedExecutionHandler implementation that will be used to handle any + * rejected tasks when they are submitted to this executor. The Executor takes + * ownership of the RejectedExecutionHandler instance passed once this method returns. * - * @throws RejectedExecutionException based on instruction from RejectedExecutionHandler - * if the given task cannot be accepted for execution at this time. - * @throws NullPointerException - if command is null + * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative + * or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize + * is greater than maximumPoolSize. + * @throws NullPointerException if the workQueue pointer is NULL. */ - virtual void execute(decaf::lang::Runnable* task); + ThreadPoolExecutor(int corePoolSize, int maxPoolSize, + long long keepAliveTime, const TimeUnit& unit, + BlockingQueue<decaf::lang::Runnable*>* workQueue, + RejectedExecutionHandler* handler); /** - * Performs an orderly shutdown of this Executor. Previously queued tasks are allowed - * to complete but no new tasks are accepted for execution. Calling this method more - * than once has no affect on this executor. + * Creates a new instance of a ThreadPoolExecutor. + * + * The executor instance is configured with the passed in parameters and a + * default thread Factory is used along with a default rejected execution + * handler. + * + * @param corePoolSize + * The number of threads to pool regardless of their idle state. + * @param maxPoolSize + * The maximum number of threads that will ever exist at one time in the pool. + * @param keepAliveTime + * The maximum time to keep a thread in the pool for if the number of current + * threads exceeds to core pool size. + * @param unit + * The units that the keepAliveTime is specified in. + * @param workQueue + * A BlockingQueue implementation that will be used to hold Runnable tasks + * that are awaiting execution within this executor. The Executor takes + * ownership of the BlockingQueue instance passed once this method returns. + * @param threadFactory + * A ThreadFactory implementation that will be used to create worker threads + * that are used by this executor to run the submitted tasks. The Executor takes + * ownership of the ThreadFactory instance passed once this method returns. + * + * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative + * or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize + * is greater than maximumPoolSize. + * @throws NullPointerException if the workQueue pointer is NULL. */ - virtual void shutdown(); + ThreadPoolExecutor(int corePoolSize, int maxPoolSize, + long long keepAliveTime, const TimeUnit& unit, + BlockingQueue<decaf::lang::Runnable*>* workQueue, + ThreadFactory* threadFactory); /** - * The caller will block until the executor has completed termination meaning all tasks - * that where scheduled before shutdown have now completed and the executor is ready for - * deletion. If the timeout period elapses before the executor reaches the terminated - * state then this method return false to indicate it has not terminated. + * Creates a new instance of a ThreadPoolExecutor. * - * @param timeout - * The amount of time to wait before abandoning the wait for termination. - * @param unit - * The unit of time that the timeout value represents. + * The executor instance is configured with the passed in parameters and a + * default thread Factory is used along with a default rejected execution + * handler. * - * @return true if the executor terminated or false if the timeout expired. + * @param corePoolSize + * The number of threads to pool regardless of their idle state. + * @param maxPoolSize + * The maximum number of threads that will ever exist at one time in the pool. + * @param keepAliveTime + * The maximum time to keep a thread in the pool for if the number of current + * threads exceeds to core pool size. + * @param unit + * The units that the keepAliveTime is specified in. + * @param workQueue + * A BlockingQueue implementation that will be used to hold Runnable tasks + * that are awaiting execution within this executor. The Executor takes + * ownership of the BlockingQueue instance passed once this method returns. + * @param threadFactory + * A ThreadFactory implementation that will be used to create worker threads + * that are used by this executor to run the submitted tasks. The Executor takes + * ownership of the ThreadFactory instance passed once this method returns. + * @param handler + * A RejectedExecutionHandler implementation that will be used to handle any + * rejected tasks when they are submitted to this executor. The Executor takes + * ownership of the BlockingQueue instance passed once this method returns. * - * @throws InterruptedException if this call is interrupted while awaiting termination. + * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative + * or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize + * is greater than maximumPoolSize. + * @throws NullPointerException if the workQueue pointer is NULL. */ + ThreadPoolExecutor(int corePoolSize, int maxPoolSize, + long long keepAliveTime, const TimeUnit& unit, + BlockingQueue<decaf::lang::Runnable*>* workQueue, + ThreadFactory* threadFactory, + RejectedExecutionHandler* handler); + + virtual ~ThreadPoolExecutor(); + + virtual void execute(decaf::lang::Runnable* task); + + virtual void shutdown(); + + virtual ArrayList<decaf::lang::Runnable*> shutdownNow(); + virtual bool awaitTermination(long long timeout, const decaf::util::concurrent::TimeUnit& unit); + virtual bool isShutdown() const; + + virtual bool isTerminated() const; + /** * Returns the number of threads that currently exists for this Executor. * @@ -150,6 +237,21 @@ namespace concurrent{ virtual int getCorePoolSize() const; /** + * Set the number of threads that this executor treats as its core threads, this value + * will override the value set in the constructor. If the value given is less than the + * current value then the core threads will shrink to the new value over time. If the + * value is larger than the current value then new threads may be started to process + * currently pending tasks, otherwise they will be started as needed when new tasks + * arrive. + * + * @param poolSize + * The new core pool size for this executor. + * + * @throws IllegalArgumentException if the pool size value is less than zero. + */ + virtual void setCorePoolSize(int poolSize); + + /** * Returns the configured maximum number of threads for this Executor. * * @return the configured maximum number of Threads. @@ -157,6 +259,19 @@ namespace concurrent{ virtual int getMaximumPoolSize() const; /** + * Sets the maximum number of workers this Executor is allowed to have at any given + * time above the core pool size. This new value overrides any set in the constructor + * and if smaller than the current value worker threads will terminate as they complete + * their current tasks and become idle. + * + * @param maxSize + * The new maximum allowed worker pool size. + * + * @throws IllegalArgumentException if maxSize is negative or less than core pool size. + */ + virtual void setMaximumPoolSize(int maxSize); + + /** * Returns the current number of pending tasks in the work queue. This is * an approximation as the number of pending tasks can quickly changes as * tasks complete and new tasks are started. @@ -190,28 +305,309 @@ namespace concurrent{ virtual int getLargestPoolSize() const; /** - * Returns whether this executor has been shutdown or not. + * Provides access to the Task Queue used by this Executor. This method is meant mainly + * for debugging and monitoring, care should be taken when using this method. The executor + * continues to execute tasks from the Queue. * - * @return true if this executor has been shutdown. + * @returns a pointer to the blocking queue that this executor stores future tasks in. */ - virtual bool isShutdown() const; + virtual BlockingQueue<decaf::lang::Runnable*>* getQueue(); /** - * Returns whether all tasks have completed after this executor was shut down. + * Returns true if the executor has begin the process of terminating but has not yet + * completed the process of shutting down all worker threads. If the Executor does + * not transition from this state to terminated after some time its generally an + * indication that one of the submitted tasks will not complete and the executor is + * locked in a terminating state. * * @return true if all tasks have completed after a request to shut down was made. */ - virtual bool isTerminated() const; + virtual bool isTerminating() const; + + /** + * When true this setting allows the threads in the core pool to terminate if they + * sit idle longer than the set keep alive time. Core threads that terminate are + * replaced as needed by new ones on demand. This settings requires that the set + * keep alive time be greater than zero and will throw an IllegalArguementException + * if that is not the case. + * + * @param value + * Boolean value indicating if core threads are allowed to time out when idle. + * + * @throws IllegalArgumentException if the keep alive time is set to zero. + */ + virtual void allowCoreThreadTimeout(bool value); + + /** + * Returns whether this executor has been configured to allow core threads to + * terminate if they sit idle longer than the configured keep alive time. Threads + * that are not core threads continue to time out using the set keep alive value + * regardless of whether this option is enabled. + * + * @returns true if core threads can timeout when idle. + */ + virtual bool allowsCoreThreadTimeout() const; + + /** + * Returns the currently set value for the maximum amount of time a worker Thread + * that is not part of the core threads is allowed to sit idle before it terminates. + * + * @param unit + * The unit of time to return the results in. + * + * @returns the configure keep alive time in the requested time units. + */ + virtual long long getKeepAliveTime(const TimeUnit& unit) const; + + /** + * Configures the amount of time a non core Thread will remain alive after it has + * completed its assigned task. This value can also be applied to core threads if + * the allowCoreThreadsTimeout option is enabled. + * + * @param timeout + * The amount of time an idle worker will live before terminating. + * @param unit + * The units that the timeout is given in. + * + * @throws IllegalArgumentException if allowCoreThreadsTimeout is enabled and the + * the timeout value given is zero, or the timeout given is negative. + */ + virtual void setKeepAliveTime(long long timeout, const TimeUnit& unit); + + /** + * Sets the ThreadFactory instance used to create new Threads for this Executor. + * + * This class takes ownership of the given ThreadFactory and will destroy it + * upon termination or when a new ThreadFactory is set using this method. + * + * @param factory + * A ThreadFactory instance used by this Executor to create new Threads. + * + * @throws NullPointerException if the given factory pointer is NULL. + */ + virtual void setThreadFactory(ThreadFactory* factory); + + /** + * Gets the currently configured ThreadFactory. It is considered a programming + * error to delete the pointer returned by this method. + * + * @returns the currently configured ThreadFactory instance used by this object. + */ + virtual ThreadFactory* getThreadFactory() const; + + /** + * Gets the currently configured RejectedExecutionHandler for this Executor. + * + * @returns a pointer to the current RejectedExecutionHandler. + */ + virtual RejectedExecutionHandler* getRejectedExecutionHandler() const; + + /** + * Sets the new RejectedExecutionHandler that this executor should use to process any + * rejected Runnable tasks. This executor takes ownership of the supplied pointer and + * will desotroy it upon termination, any previous handler is destroyed by this call. + * + * @param handler + * The new RejectedExecutionHandler instance to use. + * + * @throws NullPointerException if the handler is NULL. + */ + virtual void setRejectedExecutionHandler(RejectedExecutionHandler* handler); + + /** + * By default a Core thread is only created once the first task is queued, this method + * forces the creation of core thread that waits in an idle mode for new work to be + * enqueued. If the limit on core threads has already been reached then this method + * returns false. + * + * @return true if a new core thread was added, false otherwise. + */ + virtual bool prestartCoreThread(); + + /** + * This method will create and start new core threads running in an idle state waiting for + * new tasks up to the set core thread limit. When the limit is reached this method returns + * zero to indicate no more core threads can be created. + * + * @returns the number of core threads created, or zero if the limit has already been met. + */ + virtual int prestartAllCoreThreads(); + + /** + * Attempts to remove the Runnable from the work queue, if successful then the caller + * now owns the Runnable and is responsible for deleting it. + * + * @param task + * The task that is to be removed from the work queue. + * + * @returns true if the task was removed from the Queue. + */ + bool remove(decaf::lang::Runnable* task); + + /** + * Attempts to remove any Future derived tasks from the pending work queue if they have + * been canceled. This method can be used to more quickly remove and reclaim space as + * canceled tasks are not run but must await a worker thread to be removed normally. + * Since there are multiple threads in operation its possible for this method to not + * remove all canceled tasks from the work queue. + */ + virtual void purge(); protected: /** + * Method called before a task is executed by the given thread. The default implementation + * of this method does nothing, however a subclass can override this method to add some + * new functionality. + * + * It is recommended that a subclass call this method on its base class to ensure that + * all base classes have a chance to process this event. + * + * @param thread + * The thread that will be executing the given task. + * @param task + * The task that will be executed by the given thread. + */ + virtual void beforeExecute(decaf::lang::Thread* thread, decaf::lang::Runnable* task); + + /** + * Called upon completion of execution of a given task. This method is called + * from the Thread that executed the given Runnable. If the Throwable pointer is + * not NULL then its value is the Exception that caused the task to terminate. + * + * The base class implementation does nothing, a derived class should call this + * method on its base class to ensure that all subclasses have a chance to process + * the afterExecute event. + * + * @param task + * The Runnable instance that was executed by the calling Thread. + * @param error + * The exception that was thrown from the given Runnable. + */ + virtual void afterExecute(decaf::lang::Runnable* task, decaf::lang::Throwable* error); + + /** * Method invoked when the Executor has terminated, by default this method does * nothing. When overridden the subclass should call superclass::terminated to * ensure that all subclasses have their terminated method invoked. */ virtual void terminated(); + public: // RejectedExecutionHandler implementations. + + /** + * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute + * this class always throws a RejectedExecutionException. + * + * @since 1.0 + */ + class AbortPolicy : public RejectedExecutionHandler { + public: + + AbortPolicy() : RejectedExecutionHandler() { + } + + virtual ~AbortPolicy() { + } + + virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) { + delete task; + throw RejectedExecutionException(__FILE__, __LINE__, "Unable to execute task."); + } + + }; + + /** + * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute + * this class will attempt to run the task in the Thread that called the execute method + * unless the executor is shutdown in which case the task is not run and is destroyed.. + * + * @since 1.0 + */ + class CallerRunsPolicy : public RejectedExecutionHandler { + public: + + CallerRunsPolicy() : RejectedExecutionHandler() { + } + + virtual ~CallerRunsPolicy() { + } + + virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) { + + if (executer->isShutdown()) { + delete task; + return; + } + + try{ + task->run(); + } catch(decaf::lang::Exception& ex) { + delete task; + throw ex; + } + } + + /** + * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute + * this class always destroys the rejected task and returns quietly. + * + * @since 1.0 + */ + class DiscardPolicy : public RejectedExecutionHandler { + public: + + DiscardPolicy() : RejectedExecutionHandler() { + } + + virtual ~DiscardPolicy() { + } + + virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) { + delete task; + } + + }; + + /** + * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute + * this class always destroys the oldest unexecuted task in the Queue and then attempts + * to execute the rejected task using the passed in executor.. + * + * @since 1.0 + */ + class DiscardOldestPolicy : public RejectedExecutionHandler { + public: + + DiscardOldestPolicy() : RejectedExecutionHandler() { + } + + virtual ~DiscardOldestPolicy() { + } + + virtual void rejectedExecution( decaf::lang::Runnable* task, ThreadPoolExecutor* executer ) { + + if (executer->isShutdown()) { + delete task; + return; + } + + try{ + + decaf::lang::Runnable* oldest = NULL; + executer->getQueue()->poll(oldest); + delete oldest; + + executer->execute(task); + } catch(decaf::lang::Exception& ex) { + delete task; + throw ex; + } + } + + }; + + }; }; }}}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp Mon Apr 11 20:22:39 2011 @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AbstractOwnableSynchronizer.h" + +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::util; +using namespace decaf::util::concurrent; +using namespace decaf::util::concurrent::locks; + +//////////////////////////////////////////////////////////////////////////////// +AbstractOwnableSynchronizer::AbstractOwnableSynchronizer() : ownerThread() { +} + +//////////////////////////////////////////////////////////////////////////////// +AbstractOwnableSynchronizer::~AbstractOwnableSynchronizer() { +} + +//////////////////////////////////////////////////////////////////////////////// +decaf::lang::Thread* AbstractOwnableSynchronizer::getExclusiveOwnerThread() const { + return this->ownerThread; +} + +//////////////////////////////////////////////////////////////////////////////// +void AbstractOwnableSynchronizer::setExclusiveOwnerThread(decaf::lang::Thread* thread) { + this->ownerThread = thread; +} Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h Mon Apr 11 20:22:39 2011 @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DECAF_UTIL_CONCURRENT_ABSTRACTOWNABLESYNCHRONIZER_H_ +#define _DECAF_UTIL_CONCURRENT_ABSTRACTOWNABLESYNCHRONIZER_H_ + +#include <decaf/util/Config.h> + +namespace decaf { +namespace lang { + class Thread; +} +namespace util { +namespace concurrent { +namespace locks { + + /** + * Base class for locks that provide the notion of Ownership, the types of locks + * that are implemented using this base class would be owned by one specific Thread + * at any given time. + * + * @since 1.0 + */ + class DECAF_API AbstractOwnableSynchronizer { + private: + + decaf::lang::Thread* ownerThread; + + public: + + virtual ~AbstractOwnableSynchronizer(); + + protected: + + AbstractOwnableSynchronizer(); + + /** + * Gets the Thread that was last set using the setExclusiveOwnerThread method, or NULL + * if no Thread has been made the exclusive owner. + * + * @return pointer to the owner Thread or NULL if not set. + */ + decaf::lang::Thread* getExclusiveOwnerThread() const; + + /** + * Sets the Thread that has exclusive ownership of this Synchronizer, can be NULL + * to indicate that no Thread now owns this Synchronizer. + * + * @param thread + * The Thread that now has ownership, or NULL if ownership is released. + */ + void setExclusiveOwnerThread(decaf::lang::Thread* thread); + + }; + +}}}} + +#endif /* _DECAF_UTIL_CONCURRENT_ABSTRACTOWNABLESYNCHRONIZER_H_ */ Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Mon Apr 11 20:22:39 2011 @@ -207,6 +207,7 @@ cc_sources = \ decaf/util/concurrent/CopyOnWriteArrayListTest.cpp \ decaf/util/concurrent/CopyOnWriteArraySetTest.cpp \ decaf/util/concurrent/CountDownLatchTest.cpp \ + decaf/util/concurrent/ExecutorsTest.cpp \ decaf/util/concurrent/LinkedBlockingQueueTest.cpp \ decaf/util/concurrent/MutexTest.cpp \ decaf/util/concurrent/SynchronousQueueTest.cpp \ @@ -429,6 +430,7 @@ h_sources = \ decaf/util/concurrent/CopyOnWriteArrayListTest.h \ decaf/util/concurrent/CopyOnWriteArraySetTest.h \ decaf/util/concurrent/CountDownLatchTest.h \ + decaf/util/concurrent/ExecutorsTest.h \ decaf/util/concurrent/LinkedBlockingQueueTest.h \ decaf/util/concurrent/MutexTest.h \ decaf/util/concurrent/SynchronousQueueTest.h \ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp Mon Apr 11 20:22:39 2011 @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ExecutorsTest.h" + +#include <decaf/lang/Pointer.h> +#include <decaf/util/concurrent/ThreadPoolExecutor.h> +#include <decaf/util/concurrent/Executors.h> +#include <decaf/util/concurrent/CountDownLatch.h> + +using namespace std; +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::lang::exceptions; +using namespace decaf::util; +using namespace decaf::util::concurrent; + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + class DefaultThreadFactoryRunnable : public Runnable { + private: + + CountDownLatch* shutdown; + + public: + + DefaultThreadFactoryRunnable(CountDownLatch* shutdown) : Runnable(), shutdown(shutdown) { + } + + virtual ~DefaultThreadFactoryRunnable() {} + + virtual void run() { + this->shutdown->await(); + } + + void signalDone() { + this->shutdown->countDown(); + } + }; + + class NoOpRunnable : public Runnable { + public: + + NoOpRunnable() : Runnable() { + } + + virtual ~NoOpRunnable() {} + + virtual void run() { + } + }; + + class SimpleThreadFactory : public ThreadFactory{ + public: + + virtual Thread* newThread(Runnable* task) { + return new Thread(task); + } + }; + + void joinPool(Pointer<ExecutorService>& exec) { + try { + exec->shutdown(); + CPPUNIT_ASSERT(exec->awaitTermination(5000, TimeUnit::MILLISECONDS)); + } catch(InterruptedException& ie) { + CPPUNIT_FAIL("Unexpected exception"); + } + } + +} + +//////////////////////////////////////////////////////////////////////////////// +ExecutorsTest::ExecutorsTest() { +} + +//////////////////////////////////////////////////////////////////////////////// +ExecutorsTest::~ExecutorsTest() { +} + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorsTest::testDefaultThreadFactory() { + + CountDownLatch shutdown(1); + Pointer<ThreadFactory> defaultFactory; + DefaultThreadFactoryRunnable* runner = new DefaultThreadFactoryRunnable(&shutdown); + + defaultFactory.reset(Executors::getDefaultThreadFactory()); + + Thread* theThread = defaultFactory->newThread(runner); + + CPPUNIT_ASSERT(theThread != NULL); + CPPUNIT_ASSERT_EQUAL(false, theThread->isDaemon()); + const int expected = Thread::NORM_PRIORITY; + CPPUNIT_ASSERT_EQUAL(expected, theThread->getPriority()); + + theThread->start(); + + shutdown.countDown(); + theThread->join(); + + delete theThread; + delete runner; +} + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorsTest::testNewFixedThreadPool1() { + Pointer<ExecutorService> e(Executors::newFixedThreadPool(2)); + + e->execute(new NoOpRunnable()); + e->execute(new NoOpRunnable()); + e->execute(new NoOpRunnable()); + + joinPool(e); +} + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorsTest::testNewFixedThreadPool2() { + + Pointer<ExecutorService> e(Executors::newFixedThreadPool(2, new SimpleThreadFactory())); + + e->execute(new NoOpRunnable()); + e->execute(new NoOpRunnable()); + e->execute(new NoOpRunnable()); + + joinPool(e); +} + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorsTest::testNewFixedThreadPool3() { + + CPPUNIT_ASSERT_THROW_MESSAGE( + "Should throw a NullPointerException", + Executors::newFixedThreadPool(2, NULL), + NullPointerException); +} + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorsTest::testNewFixedThreadPool4() { + + CPPUNIT_ASSERT_THROW_MESSAGE( + "Should throw a IllegalArgumentException", + Executors::newFixedThreadPool(0), + IllegalArgumentException); +} Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h Mon Apr 11 20:22:39 2011 @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DECAF_UTIL_CONCURRENT_EXECUTORSTEST_H_ +#define _DECAF_UTIL_CONCURRENT_EXECUTORSTEST_H_ + +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +namespace decaf { +namespace util { +namespace concurrent { + + class ExecutorsTest : public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE( ExecutorsTest ); + CPPUNIT_TEST( testDefaultThreadFactory ); + CPPUNIT_TEST( testNewFixedThreadPool1 ); + CPPUNIT_TEST( testNewFixedThreadPool2 ); + CPPUNIT_TEST( testNewFixedThreadPool3 ); + CPPUNIT_TEST( testNewFixedThreadPool4 ); + CPPUNIT_TEST_SUITE_END(); + + public: + + ExecutorsTest(); + virtual ~ExecutorsTest(); + + void testDefaultThreadFactory(); + void testNewFixedThreadPool1(); + void testNewFixedThreadPool2(); + void testNewFixedThreadPool3(); + void testNewFixedThreadPool4(); + + }; + +}}} + +#endif /* _DECAF_UTIL_CONCURRENT_EXECUTORSTEST_H_ */ Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Mon Apr 11 20:22:39 2011 @@ -275,6 +275,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( decaf:: CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::MutexTest ); #include <decaf/util/concurrent/ThreadPoolExecutorTest.h> CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ThreadPoolExecutorTest ); +#include <decaf/util/concurrent/ExecutorsTest.h> +CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ExecutorsTest ); #include <decaf/util/concurrent/TimeUnitTest.h> CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::TimeUnitTest ); #include <decaf/util/concurrent/LinkedBlockingQueueTest.h>
