Author: giacomo Date: Fri Nov 5 14:47:21 2004 New Revision: 56701 Modified: cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf Log: - made DefaultRunnableManager be able to start and stop - added DefaultRunnableManager to roles and cocoon.xconf - made ContinuationsManagerImpl use the RunnableManager for expire Continuation checking
Modified: cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles ============================================================================== --- cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles (original) +++ cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles Fri Nov 5 14:47:21 2004 @@ -218,5 +218,10 @@ <role name="org.apache.cocoon.components.persistence.RequestDataStore" shorthand="request-data-store" default-class="org.apache.cocoon.components.persistence.RequestDataStoreImpl"/> + + <!-- Running commands (Runnable) in background --> + <role name="org.apache.cocoon.components.thread.RunnableManager" + shorthand="runnable-manager" + default-class="org.apache.cocoon.components.thread.DefaultRunnableManager"/> </role-list> Modified: cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java ============================================================================== --- cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java (original) +++ cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java Fri Nov 5 14:47:21 2004 @@ -30,7 +30,11 @@ import org.apache.avalon.framework.configuration.Configurable; import org.apache.avalon.framework.configuration.Configuration; import org.apache.avalon.framework.logger.AbstractLogEnabled; +import org.apache.avalon.framework.service.ServiceException; +import org.apache.avalon.framework.service.ServiceManager; +import org.apache.avalon.framework.service.Serviceable; import org.apache.avalon.framework.thread.ThreadSafe; +import org.apache.cocoon.components.thread.RunnableManager; /** * The default implementation of [EMAIL PROTECTED] ContinuationsManager}. @@ -43,7 +47,7 @@ */ public class ContinuationsManagerImpl extends AbstractLogEnabled - implements ContinuationsManager, Configurable, Disposable, ThreadSafe { + implements ContinuationsManager, Configurable, Disposable, ThreadSafe, Serviceable { static final int CONTINUATION_ID_LENGTH = 20; static final String EXPIRE_CONTINUATIONS = "expire-continuations"; @@ -54,8 +58,6 @@ protected SecureRandom random; protected byte[] bytes; - protected ContinuationInterrupt interrupt; - /** * How long does a continuation exist in memory since the last * access? The time is in miliseconds, and the default is 1 hour. @@ -84,6 +86,7 @@ private String instrumentableName; + private ServiceManager serviceManager; public ContinuationsManagerImpl() throws Exception { try { @@ -103,13 +106,17 @@ this.defaultTimeToLive = config.getAttributeAsInteger("time-to-live", (3600 * 1000)); final Configuration expireConf = config.getChild("expirations-check"); + final long initialDelay = expireConf.getChild("offset", true).getValueAsLong(180000); + final long interval = expireConf.getChild("period", true).getValueAsLong(180000); try { - this.interrupt = new ContinuationInterrupt(expireConf); - Thread thread = new Thread(interrupt); - thread.setDaemon(true); - thread.setName("continuation-interrupt"); - thread.start(); - Thread.yield(); + final RunnableManager runnableManager = (RunnableManager)serviceManager.lookup(RunnableManager.ROLE); + runnableManager.execute( new Runnable() { + public void run() + { + expireContinuations(); + } + }, initialDelay, interval); + serviceManager.release(runnableManager); } catch (Exception e) { getLogger().warn("Could not enqueue continuations expiration task. " + "Continuations will not automatically expire.", e); @@ -117,13 +124,22 @@ } /* (non-Javadoc) + * @see org.apache.avalon.framework.service.Serviceable#service() + */ + public void service( ServiceManager manager ) + throws ServiceException + { + this.serviceManager = manager; + } + + /* (non-Javadoc) * @see org.apache.avalon.framework.activity.Disposable#dispose() */ public void dispose() { // stop the thread - if ( this.interrupt != null ) { + /*if ( this.interrupt != null ) { this.interrupt.doRun = false; - } + }*/ } public WebContinuation createWebContinuation(Object kont, @@ -321,7 +337,7 @@ /** * Remove all continuations which have already expired. */ - private void expireContinuations() { + protected void expireContinuations() { long now = 0; if (getLogger().isDebugEnabled()) { now = System.currentTimeMillis(); @@ -352,47 +368,6 @@ displayAllContinuations(); displayExpireSet(); */ - } - } - - - final class ContinuationInterrupt implements Runnable { - private final long interval; - private final long initialDelay; - - public boolean doRun; - - /** - * @param expireConf - */ - public ContinuationInterrupt(Configuration expireConf) { - // only periodic time triggers are supported - this.initialDelay = expireConf.getChild("offset", true).getValueAsLong(100); - this.interval = expireConf.getChild("period", true).getValueAsLong(100); - } - - /** - * expire any continuations that need expiring. - */ - public void run() { - this.doRun = true; - if ( this.initialDelay > 0 ) { - // Sleep - try { - Thread.sleep(this.initialDelay); - } catch (InterruptedException ignore) { - // ignore - } - } - while (doRun) { - expireContinuations(); - // Sleep - try { - Thread.sleep(this.interval); - } catch (InterruptedException ignore) { - // ignore - } - } } } } Modified: cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java ============================================================================== --- cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java (original) +++ cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java Fri Nov 5 14:47:21 2004 @@ -16,11 +16,14 @@ package org.apache.cocoon.components.thread; import org.apache.avalon.framework.activity.Disposable; +import org.apache.avalon.framework.activity.Initializable; +import org.apache.avalon.framework.activity.Startable; import org.apache.avalon.framework.configuration.Configurable; import org.apache.avalon.framework.configuration.Configuration; import org.apache.avalon.framework.configuration.ConfigurationException; import org.apache.avalon.framework.logger.AbstractLogEnabled; import org.apache.avalon.framework.logger.Logger; +import org.apache.avalon.framework.thread.ThreadSafe; import java.util.HashMap; import java.util.Iterator; @@ -59,7 +62,8 @@ */ public class DefaultRunnableManager extends AbstractLogEnabled - implements RunnableManager, Configurable, Disposable, Runnable + implements RunnableManager, Configurable, Disposable, Runnable, Startable, + ThreadSafe { //~ Static fields/initializers --------------------------------------------- @@ -80,7 +84,7 @@ public static final String DEFAULT_THREAD_PRIORITY = "NORM"; /** The default keep alive time */ - public static final long DEFAULT_KEEP_ALIVE_TIME = 20000L; + public static final long DEFAULT_KEEP_ALIVE_TIME = 60000L; /** The default way to shutdown gracefully */ public static final boolean DEFAULT_SHUTDOWN_GRACEFUL = false; @@ -93,17 +97,20 @@ //~ Instance fields -------------------------------------------------------- + /** + * Sorted set of <code>ExecutionInfo</code> instances, based on their next + * execution time. + */ + protected SortedSet m_executionInfo = new TreeSet( ); + /** The managed thread pools */ final Map m_pools = new HashMap( ); /** The configured default ThreadFactory class instance */ private Class m_defaultThreadFactoryClass; - /** - * Sorted set of <code>ExecutionInfo</code> instances, based on their next - * execution time. - */ - protected SortedSet m_executionInfo = new TreeSet( ); + /** Keep us running? */ + private boolean m_keepRunning = false; //~ Methods ---------------------------------------------------------------- @@ -255,13 +262,40 @@ */ public void dispose( ) { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Disposing all thread pools" ); + } + for( final Iterator i = m_pools.keySet( ).iterator( ); i.hasNext( ); ) { - final DefaultThreadPool pool = (DefaultThreadPool)i.next( ); + final String poolName = (String)i.next( ); + final DefaultThreadPool pool = + (DefaultThreadPool)m_pools.get( poolName ); + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Disposing thread pool " + + pool.getName( ) ); + } + pool.shutdown( ); + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Thread pool " + pool.getName( ) + + " disposed" ); + } } - m_pools.clear( ); + try + { + m_pools.clear( ); + } + catch( final Throwable t ) + { + getLogger( ).error( "Cannot dispose", t ); + } } /** @@ -271,6 +305,8 @@ * @param command The [EMAIL PROTECTED] Runnable} to execute * @param delay the delay befor first run * @param interval The interval for repeated runs + * + * @throws IllegalArgumentException DOCUMENT ME! */ public void execute( final String threadPoolName, final Runnable command, @@ -281,10 +317,12 @@ { throw new IllegalArgumentException( "delay < 0" ); } + if( interval < 0 ) { throw new IllegalArgumentException( "interval < 0" ); } + ThreadPool pool = (ThreadPool)m_pools.get( threadPoolName ); if( null == pool ) @@ -295,6 +333,13 @@ pool = (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ); } + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Command entered: " + command.toString( ) + + ",pool=" + pool.getName( ) + ",delay=" + + delay + ",interval=" + interval ); + } + new ExecutionInfo( pool, command, delay, interval, getLogger( ) ); } @@ -365,7 +410,12 @@ */ public void run( ) { - while( true ) + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Entering loop" ); + } + + while( m_keepRunning ) { synchronized( m_executionInfo ) { @@ -385,31 +435,74 @@ } else { - if(getLogger().isDebugEnabled() ) + if( getLogger( ).isDebugEnabled( ) ) { - getLogger().debug( "No commands available. Will just wait for one" ); + getLogger( ).debug( "No commands available. Will just wait for one" ); } + m_executionInfo.wait( ); } } catch( final InterruptedException ie ) { - if(getLogger().isDebugEnabled() ) + if( getLogger( ).isDebugEnabled( ) ) { - getLogger().debug( "I've been interrupted" ); + getLogger( ).debug( "I've been interrupted" ); } } - final ExecutionInfo info = (ExecutionInfo)m_executionInfo.first( ); - final long delay = - info.m_nextRun - System.currentTimeMillis( ); - - if( delay < 0 ) + if( m_keepRunning ) { - info.execute( ); + final ExecutionInfo info = + (ExecutionInfo)m_executionInfo.first( ); + final long delay = + info.m_nextRun - System.currentTimeMillis( ); + + if( delay < 0 ) + { + info.execute( ); + } } } } + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Exiting loop" ); + } + } + + /** + * Start the managing thread + * + * @throws Exception DOCUMENT ME! + */ + public void start( ) + throws Exception + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "starting heart" ); + } + + m_keepRunning = true; + ( (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ) ).execute( this ); + } + + /** + * Stop the managing thread + * + * @throws Exception DOCUMENT ME! + */ + public void stop( ) + throws Exception + { + m_keepRunning = false; + + synchronized( m_executionInfo ) + { + m_executionInfo.notifyAll( ); + } } /** @@ -636,12 +729,19 @@ */ void execute( ) { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Executing Command: " + + m_command.toString( ) + ",pool=" + + m_pool.getName( ) + ",delay=" + m_delay + + ",interval=" + m_interval ); + } + synchronized( m_executionInfo ) { m_executionInfo.remove( this ); m_nextRun = ( ( m_interval > 0 ) - ? ( System.currentTimeMillis( ) + m_interval ) : 0 ); - + ? ( System.currentTimeMillis( ) + m_interval ) : 0 ); if( m_nextRun > 0 ) { Modified: cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf ============================================================================== --- cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf (original) +++ cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf Fri Nov 5 14:47:21 2004 @@ -548,5 +548,93 @@ <parameter name="preemptive-loader-url" value="http://localhost:8080/cocoon/samples/cinclude/loader"/> --> - </component> + </component> + + <!--+ + | Runnable manager + | + | this component manages commands (Runnables) executed in background using + | preconfigured pools of worker threads + +--> + <runnable-manager logger="core.runnable"> + <!--+ + | This is the default configuration of the runnable-manager. More + | indepth information can be found at + | http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html + | The following elements can be used: + | + | thread-factory: specifies the fully qualified class name of an + | org.apache.cocoon.components.thread.ThreadFactory + | implementation. It is responsible to create Thread + | classes. + | thread-pools: container element for thread-pool elements. + | name: required name of the pool. + | priority: optional priority all threads of the pool will + | have (the ThreadFactory will be set to this + | priority).The possible values are: + | MIN: corresponds to Thread#MIN_PRIORITY + | NORM: corresponds to Thread#NORM_PRIORITY (default) + | MAX: corresponds to Thread#MAX_PRIORITY + | queue-size: optional size of a queue to hold Runnables if the + | pool is full. Possible values are: + | less than 0: unbounded (default) + | equal to 0: no queue at all + | greater than 0: size of the queue + | max-pool-size: optional maximum number of threads in the pool. + | Defaults to 5. + | NOTE: if a queue is specified (queue-sie != 0) + | this value will be ignored. + | min-pool-size: optional minimum number of threads in the pool. + | Defaults to 5. + | NOTE: if a queue has been specified (queue-sie != 0) + | this value will be used as the maximum of + | thread running concurrently. + | keep-alive-time-ms: The time in ms an idle thread should keep alive + | before it might get garbage collected. This + | defaults to 60000 ms. + | block-policy; The policy to be used if all resources (thread in + | the pool and slots in the queue) are exhausted. + | Possible values are: + | ABORT: Throw a RuntimeException + | DISCARD: Throw away the current request + | and return. + | DISCARDOLDEST: Throw away the oldest request + | and return. + | RUN (default): The thread making the execute + | request runs the task itself. + | This policy helps guard against + | lockup. + | WAIT: Wait until a thread becomes + | available. This policy should, in + | general, not be used if the + | minimum number of threads is zero, + | in which case a thread may never + | become available. + | shutdown-graceful: Terminate thread pool after processing all + | Runnables currently in queue. Any Runnable entered + | after this point will be discarded. A shut down + | pool cannot be restarted. This also means that a + | pool will need keep-alive-time-ms to terminate. + | The default value not to shutdown graceful. + | shutdown-wait-time-ms: The time in ms to wait before issuing an + | immediate shutdown after a graceful shutdown + | has been requested. + +--> + <!-- + <thread-factory>org.apache.cocoon.components.thread.DefaultThreadFactory</thread-factory> + <thread-pools> + <thread-pool> + <name>default</name> + <priority>NORM</priority> + <queue-size>-1</queue-size> + <max-pool-size>5</max-pool-size> + <min-pool-size>5</min-pool-size> + <keep-alive-time-ms>60000</keep-alive-time-ms> + <block-policy>RUN</block-policy> + <shutdown-graceful>false</shutdown-graceful> + <shutdown-wait-time-ms>-1</shutdown-wait-time-ms> + </thread-pool> + </thread-pools> + --> + </runnable-manager> </cocoon>