bloritsch 2003/01/27 11:54:54 Modified: event/src/java/org/apache/excalibur/event/command AbstractThreadManager.java DefaultThreadManager.java TPCThreadManager.java Log: use util.concurrent and simplify the ThreadManager Revision Changes Path 1.15 +52 -214 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java Index: AbstractThreadManager.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java,v retrieving revision 1.14 retrieving revision 1.15 diff -u -r1.14 -r1.15 --- AbstractThreadManager.java 27 Jan 2003 18:26:04 -0000 1.14 +++ AbstractThreadManager.java 27 Jan 2003 19:54:54 -0000 1.15 @@ -49,19 +49,17 @@ */ package org.apache.excalibur.event.command; -import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; + +import org.apache.commons.collections.StaticBucketMap; import org.apache.avalon.framework.activity.Disposable; import org.apache.avalon.framework.activity.Initializable; import org.apache.avalon.framework.logger.AbstractLogEnabled; import org.apache.excalibur.event.EventHandler; import org.apache.excalibur.event.Source; -import org.apache.excalibur.thread.ThreadControl; -import org.apache.excalibur.thread.ThreadPool; -import EDU.oswego.cs.dl.util.concurrent.ReentrantLock; +import EDU.oswego.cs.dl.util.concurrent.Executor; /** * Abstract base class for a ThreadManager that has a single ThreadPool for @@ -73,20 +71,11 @@ public abstract class AbstractThreadManager extends AbstractLogEnabled implements Runnable, ThreadManager, Initializable, Disposable { - /** The Mutex used in this ThreadManager */ - private final ReentrantLock m_mutex = new ReentrantLock(); - /** The pipelines we are managing */ - private final HashMap m_pipelines = new HashMap(); - - /** The controls we have */ - private final LinkedList m_controls = new LinkedList(); + private final StaticBucketMap m_pipelines = new StaticBucketMap(); /** The ThreadPool we are using */ - private ThreadPool m_threadPool; - - /** The ThreadControl for the ThreadManager itself */ - private ThreadControl m_threadControl; + private Executor m_executor; /** Whether we are done or not */ private volatile boolean m_done = false; @@ -114,19 +103,27 @@ } /** + * Get the current amount of sleep time. + */ + protected long getSleepTime() + { + return m_sleepTime; + } + + /** * Set the ThreadPool we are using * * @param threadPool The ThreadPool */ - protected void setThreadPool( ThreadPool threadPool ) + protected void setExecutor( Executor executor ) { - if( null == m_threadPool ) + if( null == m_executor ) { - m_threadPool = threadPool; + m_executor = executor; } else { - throw new IllegalStateException( "Can only set thread pool once" ); + throw new IllegalStateException( "Can only set the executor once" ); } } @@ -137,12 +134,12 @@ */ public void initialize() throws Exception { - if( null == m_threadPool ) + if( null == m_executor ) { throw new IllegalStateException( "No thread pool set" ); } - this.m_threadControl = this.m_threadPool.execute( this ); + m_executor.execute( this ); this.m_initialized = true; } @@ -161,26 +158,18 @@ try { - m_mutex.acquire(); + PipelineRunner runner = new PipelineRunner( pipeline ); + runner.enableLogging( getLogger() ); + m_pipelines.put( pipeline, runner ); - try + if( m_done ) { - PipelineRunner runner = new PipelineRunner( pipeline ); - runner.enableLogging( getLogger() ); - m_pipelines.put( pipeline, runner ); - - if( m_done ) - { - m_threadControl = m_threadPool.execute( this ); - } - } - finally - { - m_mutex.release(); + m_executor.execute( this ); } } catch( InterruptedException ie ) { + getLogger().warn("Caught InterruptedException in register", ie); // ignore for now } } @@ -198,36 +187,11 @@ + "deregistering a pipeline" ); } - try - { - m_mutex.acquire(); - - m_pipelines.remove( pipeline ); + m_pipelines.remove( pipeline ); - if( m_pipelines.isEmpty() ) - { - m_done = true; - } - } - catch (InterruptedException ie) + if( m_pipelines.isEmpty() ) { - getLogger().warn( "deregister(" + pipeline + ") threw an InterruptedException", ie ); - } - finally - { - m_mutex.release(); - } - - if( m_done ) - { - try - { - m_threadControl.join( 1000 ); - } - catch(InterruptedException ie) - { - getLogger().warn( "deregister(" + pipeline + ") threw an InterruptedException", ie ); - } + m_done = true; } } @@ -242,55 +206,17 @@ + "before deregistering pipelines" ); } - try - { - // Aquire mutex to clear pipelines and set the m_done flag - m_mutex.acquire(); - - m_pipelines.clear(); - - m_done = true; - Iterator it = m_controls.iterator(); - - while( it.hasNext() ) - { - ThreadControl tc = (ThreadControl) it.next(); - - try - { - tc.join( 1000 ); - } - catch (InterruptedException e) - { - tc.interrupt(); - } - - getLogger().debug("disposed of another ThreadControl"); - } - - if ( ! m_pipelines.isEmpty() ) - { - throw new IllegalStateException("We still have pipelines, but no runners are available!"); - } - - } - catch (InterruptedException ie) - { - getLogger().warn( "deregisterAl() threw an InterruptedException", ie ); - } - finally + Iterator it = m_pipelines.keySet().iterator(); + while ( it.hasNext() ) { - // C.K. We must release the mutex to give the manager thread a chance to terminate. - m_mutex.release(); + deregister( (EventPipeline) it.next() ); } - try - { - m_threadControl.join( 1000 ); - } - catch (InterruptedException ie) + m_done = true; + + if ( ! m_pipelines.isEmpty() ) { - getLogger().warn( "deregisterAl() threw an InterruptedException", ie ); + throw new IllegalStateException("We still have pipelines, but no runners are available!"); } } @@ -303,133 +229,45 @@ deregisterAll(); doDispose(); - - if( m_threadControl != null && !m_threadControl.isFinished() ) - { - if( getLogger().isErrorEnabled() ) - { - getLogger().error( "The ThreadManager management thread is still active." ); - } - } - - m_threadControl = null; } protected void doDispose() {} // default impl to work with released code /** - * Return the thread controlls of all active threads - * (excluding the ThreadManager management thread) - */ - protected ThreadControl[] getThreadControls() - { - try - { - m_mutex.acquire(); - return ( ThreadControl[] ) m_controls.toArray( new ThreadControl[0] ); - } - catch( InterruptedException ie ) - { - return new ThreadControl[0]; - } - finally - { - m_mutex.release(); - } - } - - /** * The code that is run in the background to manage the ThreadPool and the * EventPipelines */ public void run() { - try + while( !m_done ) { - while( !m_done ) - { - try - { - m_mutex.acquire(); - - Iterator i = m_pipelines.values().iterator(); - - while( i.hasNext() ) - { - PipelineRunner nextRunner = ( PipelineRunner ) i.next(); - ThreadControl control = null; + Iterator i = m_pipelines.values().iterator(); - while( control == null ) - { - try - { - control = m_threadPool.execute( nextRunner ); - } - catch( IllegalStateException e ) - { - // that's the way ResourceLimitingThreadPool reports - // that it has no threads available, will still try - // to go on, hopefully at one point there will be - // a thread to execute our runner - - if( getLogger().isWarnEnabled() ) - { - getLogger().warn( "Unable to execute pipeline (If out of threads, " - + "increase block-timeout or number of threads " - + "per processor", e ); - } - } - - if (getLogger().isDebugEnabled()) - { - getLogger().debug( "Waiting on " + control ); - } - } + while( i.hasNext() ) + { + PipelineRunner nextRunner = ( PipelineRunner ) i.next(); - m_controls.add( control ); - } - } - finally + try { - m_mutex.release(); + m_executor.execute( nextRunner ); } - - Thread.sleep( m_sleepTime ); - - try + catch( Exception e ) { - m_mutex.acquire(); - - Iterator it = m_controls.iterator(); - - while( it.hasNext() ) + if( getLogger().isErrorEnabled() ) { - ThreadControl control = ( ThreadControl ) it.next(); - if( control.isFinished() ) - { - it.remove(); - } + getLogger().error( "Caught exception in ThreadManager management thread", e ); } } - finally - { - m_mutex.release(); - } } - } - catch( InterruptedException e ) - { - Thread.interrupted(); - } - catch( RuntimeException e ) - { - if( getLogger().isFatalErrorEnabled() ) + + try + { + Thread.sleep( m_sleepTime ); + } + catch( InterruptedException e ) { - getLogger().fatalError( "ThreadManager management thread aborting " - + " due to exception", e ); + Thread.interrupted(); } - - throw e; } } 1.5 +3 -3 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java Index: DefaultThreadManager.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- DefaultThreadManager.java 30 Sep 2002 16:17:01 -0000 1.4 +++ DefaultThreadManager.java 27 Jan 2003 19:54:54 -0000 1.5 @@ -49,7 +49,7 @@ */ package org.apache.excalibur.event.command; -import org.apache.excalibur.thread.ThreadPool; +import EDU.oswego.cs.dl.util.concurrent.Executor; /** * A ThreadManager that will use an external ThreadPool. This will be useful @@ -68,8 +68,8 @@ * * @param pool The ThreadPool we will use. */ - public DefaultThreadManager( final ThreadPool pool ) + public DefaultThreadManager( final Executor executor ) { - setThreadPool( pool ); + setExecutor( executor ); } } 1.35 +26 -14 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java Index: TPCThreadManager.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v retrieving revision 1.34 retrieving revision 1.35 diff -u -r1.34 -r1.35 --- TPCThreadManager.java 27 Jan 2003 18:26:04 -0000 1.34 +++ TPCThreadManager.java 27 Jan 2003 19:54:54 -0000 1.35 @@ -53,9 +53,10 @@ import org.apache.avalon.framework.parameters.ParameterException; import org.apache.avalon.framework.parameters.Parameterizable; import org.apache.avalon.framework.parameters.Parameters; -import org.apache.excalibur.thread.ThreadControl; import org.apache.excalibur.util.SystemUtil; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; + /** * This is a ThreadManager that uses a certain number of threads per * processor. The number of threads in the pool is a direct proportion to @@ -67,10 +68,11 @@ */ public final class TPCThreadManager extends AbstractThreadManager implements Parameterizable { - private EventThreadPool m_tpool; + private PooledExecutor m_threadPool; private long m_blockTimeout = 1000L; private int m_processors = -1; private int m_threadsPerProcessor = 1; + private boolean m_hardShutdown = false; /** * The following parameters can be set for this class: @@ -118,6 +120,8 @@ setSleepTime( parameters.getParameterAsLong( "sleep-time", 1000L ) ); this.m_blockTimeout = parameters.getParameterAsLong( "block-timeout", 1000L ); + + this.m_hardShutdown = ( parameters.getParameterAsBoolean( "force-shutdown", false ) ); } public void initialize() throws Exception @@ -132,32 +136,40 @@ throw new IllegalStateException( "ThreadManager is already initailized" ); } - m_tpool = new EventThreadPool( "TPCThreadManager", - ( m_processors * m_threadsPerProcessor ) + 1, ( int ) m_blockTimeout ); + m_threadPool = new PooledExecutor(( m_processors * m_threadsPerProcessor ) + 1); + m_threadPool.setMinimumPoolSize( 2 ); // at least two threads + m_threadPool.setKeepAliveTime( getSleepTime() ); + m_threadPool.waitWhenBlocked(); if( null == getLogger() ) { this.enableLogging( new NullLogger() ); } - setThreadPool( m_tpool ); + setExecutor( m_threadPool ); super.initialize(); } protected final void doDispose() { - // We should dispose all active threads - final ThreadControl[] threads = getThreadControls(); - - for( int i = 0; i < threads.length; i++ ) + if ( m_hardShutdown ) + { + m_threadPool.shutdownNow(); + } + else { - if( !threads[i].isFinished() ) - { - m_tpool.dispose( threads[i] ); - } + m_threadPool.shutdownAfterProcessingCurrentlyQueuedTasks(); } - m_tpool.dispose(); + try + { + m_threadPool.awaitTerminationAfterShutdown( getSleepTime() ); + } + catch (InterruptedException ie) + { + getLogger().warn("Thread pool took longer than " + getSleepTime() + + " ms to shut down", ie); + } } }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>