Author: giacomo Date: Mon Nov 8 05:16:55 2004 New Revision: 56921 Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/BoundedQueue.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ChannelWrapper.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultThreadFactory.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultThreadPool.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/LinkedQueue.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/Queue.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/RunnableManager.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/SynchronousChannel.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ThreadFactory.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ThreadPool.java Modified: cocoon/branches/BRANCH_2_1_X/gump.xml cocoon/branches/BRANCH_2_1_X/src/blocks/cron/java/org/apache/cocoon/components/cron/QuartzJobScheduler.java cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/cron.js cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/docs.xml cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/conf/hsqldb.xconf cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/java/org/apache/cocoon/components/hsqldb/ServerImpl.java cocoon/branches/BRANCH_2_1_X/src/blocks/portal/java/org/apache/cocoon/portal/coplet/adapter/impl/AbstractCopletAdapter.java cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/JavaLanguage.java cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/Jikes.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/cocoon.roles cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/transformation/helpers/DefaultIncludeCacheManager.java cocoon/branches/BRANCH_2_1_X/src/webapp/WEB-INF/cocoon.xconf cocoon/branches/BRANCH_2_1_X/status.xml Log: Added replacement for Excalibur Event package in org.apache.cocoon.components.thread and migrated most classes using their own threads to that package
Modified: cocoon/branches/BRANCH_2_1_X/gump.xml ============================================================================== --- cocoon/branches/BRANCH_2_1_X/gump.xml (original) +++ cocoon/branches/BRANCH_2_1_X/gump.xml Mon Nov 8 05:16:55 2004 @@ -83,6 +83,7 @@ <depend project="pizza"/> <depend project="junit"/> <depend project="eclipse-jtd"/> + <depend project="concurrent"/> <depend project="xmlunit"/><!-- used by the testcases --> <depend project="qdox"/> <!-- used by ant tasks --> @@ -1235,6 +1236,7 @@ </ant> <depend project="cocoon" inherit="all"/> + <depend project="concurrent"/> <library name="pizza"/> Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/cron/java/org/apache/cocoon/components/cron/QuartzJobScheduler.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/cron/java/org/apache/cocoon/components/cron/QuartzJobScheduler.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/cron/java/org/apache/cocoon/components/cron/QuartzJobScheduler.java Mon Nov 8 05:16:55 2004 @@ -38,6 +38,8 @@ import org.apache.avalon.framework.service.ServiceManager; import org.apache.avalon.framework.service.Serviceable; import org.apache.avalon.framework.thread.ThreadSafe; +import org.apache.cocoon.components.thread.RunnableManager; +import org.apache.cocoon.components.thread.ThreadPool; import org.quartz.CronTrigger; import org.quartz.Job; import org.quartz.JobDataMap; @@ -58,10 +60,6 @@ import org.quartz.utils.DBConnectionManager; import org.quartz.utils.JNDIConnectionProvider; -import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; -import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; -import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; - /** * This component can either schedule jobs or directly execute one. * @@ -75,19 +73,19 @@ Serviceable, Configurable, Startable, Disposable, Contextualizable, Initializable { - /** ThreadPool policy RUN */ + /** QuartzThreadPool policy RUN */ private static final String POLICY_RUN = "RUN"; - /** ThreadPool policy WAIT */ + /** QuartzThreadPool policy WAIT */ private static final String POLICY_WAIT = "WAIT"; - /** ThreadPool policy ABORT */ + /** QuartzThreadPool policy ABORT */ private static final String POLICY_ABORT = "ABORT"; - /** ThreadPool policy DISCARD */ + /** QuartzThreadPool policy DISCARD */ private static final String POLICY_DISCARD = "DISCARD"; - /** ThreadPool policy DISCARD-OLDEST */ + /** QuartzThreadPool policy DISCARD-OLDEST */ private static final String POLICY_DISCARD_OLDEST = "DISCARDOLDEST"; @@ -135,7 +133,7 @@ private Context context; /** The PooledExecutor instance */ - private PooledExecutor executor; + private ThreadPool executor; /** The quartz scheduler */ private Scheduler scheduler; @@ -317,7 +315,7 @@ // If cocoon reloads (or is it the container that reload us?) // we cannot create the same scheduler again final String runID = new Date().toString().replace(' ', '_'); - final ThreadPool pool = createThreadPool(this.config.getChild("thread-pool")); + final QuartzThreadPool pool = createThreadPool(this.config.getChild("thread-pool")); final JobStore store = createJobStore(DEFAULT_QUARTZ_SCHEDULER_NAME, runID, this.config.getChild("store")); DirectSchedulerFactory.getInstance().createScheduler(DEFAULT_QUARTZ_SCHEDULER_NAME, runID, pool, store); // scheduler = DirectSchedulerFactory.getInstance().getScheduler(DEFAULT_QUARTZ_SCHEDULER_NAME, runID); @@ -592,85 +590,33 @@ } /** - * Create a ThreadPool + * Create a QuartzThreadPool * * @param poolConfig Configuration element for the thread pool * - * @return ThreadPool + * @return QuartzThreadPool */ - private ThreadPool createThreadPool(final Configuration poolConfig) { + private QuartzThreadPool createThreadPool(final Configuration poolConfig) + throws ServiceException { final boolean useQueueing = poolConfig.getChild("use-queueing").getValueAsBoolean(false); final int queueSize = poolConfig.getChild("queue-size").getValueAsInteger(-1); - - if (useQueueing) { - if (queueSize > 0) { - this.executor = new PooledExecutor(new BoundedBuffer(queueSize)); - } else { - this.executor = new PooledExecutor(new LinkedQueue()); - } - } else { - this.executor = new PooledExecutor(); - } - final int maxPoolSize = poolConfig.getChild("max-pool-size").getValueAsInteger(-1); - - if (maxPoolSize > 0) { - this.executor.setMaximumPoolSize(maxPoolSize); - } else { - this.executor.setMaximumPoolSize(PooledExecutor.DEFAULT_MAXIMUMPOOLSIZE); - } - final int minPoolSize = poolConfig.getChild("min-pool-size").getValueAsInteger(-1); - - if (minPoolSize > 0) { - this.executor.setMinimumPoolSize(minPoolSize); - } else { - this.executor.setMinimumPoolSize(PooledExecutor.DEFAULT_MINIMUMPOOLSIZE); - } - final int keepAliveTimeMs = poolConfig.getChild("keep-alive-time-ms").getValueAsInteger(-1); - - if (keepAliveTimeMs > 0) { - this.executor.setKeepAliveTime(keepAliveTimeMs); - } else { - this.executor.setKeepAliveTime(PooledExecutor.DEFAULT_KEEPALIVETIME); - } - final String blockPolicy = poolConfig.getChild("block-policy").getValue(null); - - if (blockPolicy != null) { - if (blockPolicy.equalsIgnoreCase(POLICY_ABORT)) { - this.executor.abortWhenBlocked(); - } else if (blockPolicy.equalsIgnoreCase(POLICY_DISCARD)) { - this.executor.discardWhenBlocked(); - } else if (blockPolicy.equalsIgnoreCase(POLICY_DISCARD_OLDEST)) { - this.executor.discardOldestWhenBlocked(); - } else if (blockPolicy.equalsIgnoreCase(POLICY_RUN)) { - this.executor.runWhenBlocked(); - } else if (blockPolicy.equalsIgnoreCase(POLICY_WAIT)) { - this.executor.waitWhenBlocked(); - } else { - getLogger().warn("Unknown block-policy configuration '" + blockPolicy + "'. Should be one of '" + - POLICY_ABORT + "','" + POLICY_DISCARD + "','" + POLICY_DISCARD_OLDEST + "','" + - POLICY_RUN + "','" + POLICY_WAIT + "'. Will use '" + POLICY_RUN + "'"); - } - } - m_shutdownGraceful = poolConfig.getChild("shutdown-graceful").getValueAsBoolean(true); final int shutdownWaitTimeMs = poolConfig.getChild("shutdown-wait-time-ms").getValueAsInteger(-1); - final ThreadPool pool = new ThreadPool(this.executor, shutdownWaitTimeMs); - pool.enableLogging(getLogger()); - - if (getLogger().isInfoEnabled()) { - getLogger().info("using a PooledExecutor as ThreadPool with queueing=" + useQueueing + - (useQueueing ? (",queue-size=" + ((queueSize > 0) ? ("" + queueSize) : "default")) : "") + - ",max-pool-size=" + this.executor.getMaximumPoolSize() + ",min-pool-size=" + - this.executor.getMinimumPoolSize() + ",keep-alive-time-ms=" + this.executor.getKeepAliveTime() + - ",block-policy='" + blockPolicy + "',shutdown-wait-time-ms=" + - ((shutdownWaitTimeMs > 0) ? ("" + shutdownWaitTimeMs) : "default")); - } - + final RunnableManager runnableManager = (RunnableManager)this.manager.lookup(RunnableManager.ROLE); + final QuartzThreadPool pool = new QuartzThreadPool(runnableManager.createPool(queueSize, + maxPoolSize, + minPoolSize, + Thread.NORM_PRIORITY, + false, // no daemon + keepAliveTimeMs, + blockPolicy, + m_shutdownGraceful, + shutdownWaitTimeMs)); return pool; } @@ -809,42 +755,38 @@ } /** - * A ThreadPool for the Quartz Scheduler based on Doug Leas concurrency utilities PooledExecutor + * A QuartzThreadPool for the Quartz Scheduler based on Doug Leas concurrency utilities PooledExecutor * * @author <a href="mailto:[EMAIL PROTECTED]">Giacomo Pati</a> * @version CVS $Id$ */ - private static class ThreadPool extends AbstractLogEnabled implements org.quartz.spi.ThreadPool { + private static class QuartzThreadPool extends AbstractLogEnabled implements org.quartz.spi.ThreadPool { /** Our executor thread pool */ - private PooledExecutor executor; - - /** How long to wait for running jobs to terminate on disposition */ - private int m_shutdownWaitTimeMs; + private ThreadPool executor; /** * */ - public ThreadPool(final PooledExecutor executor, final int shutownWaitTimeMs) { + public QuartzThreadPool(final ThreadPool executor) { super(); this.executor = executor; - m_shutdownWaitTimeMs = shutownWaitTimeMs; } /* (non-Javadoc) - * @see org.quartz.spi.ThreadPool#getPoolSize() + * @see org.quartz.spi.QuartzThreadPool#getPoolSize() */ public int getPoolSize() { return this.executor.getMaximumPoolSize(); } /* (non-Javadoc) - * @see org.quartz.spi.ThreadPool#initialize() + * @see org.quartz.spi.QuartzThreadPool#initialize() */ public void initialize() { } /* (non-Javadoc) - * @see org.quartz.spi.ThreadPool#runInThread(java.lang.Runnable) + * @see org.quartz.spi.QuartzThreadPool#runInThread(java.lang.Runnable) */ public boolean runInThread(final Runnable job) { try { @@ -857,29 +799,10 @@ } /* (non-Javadoc) - * @see org.quartz.spi.ThreadPool#shutdown(boolean) + * @see org.quartz.spi.QuartzThreadPool#shutdown(boolean) */ public void shutdown(final boolean waitForJobsToComplete) { - if (waitForJobsToComplete) { - this.executor.shutdownAfterProcessingCurrentlyQueuedTasks(); - } else { - this.executor.shutdownNow(); - } - - try { - if (m_shutdownWaitTimeMs > 0) { - if (!this.executor.awaitTerminationAfterShutdown(m_shutdownWaitTimeMs)) { - getLogger().warn("scheduled cron jobs are not terminating within " + m_shutdownWaitTimeMs + - "ms, Will shut them down by interruption"); - this.executor.interruptAll(); - this.executor.shutdownNow(); - } - } - - this.executor.awaitTerminationAfterShutdown(); - } catch (final InterruptedException ie) { - getLogger().error("cannot shutdown Executor", ie); - } + this.executor.shutdown(); } } } Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/cron.js ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/cron.js (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/cron.js Mon Nov 8 05:16:55 2004 @@ -44,6 +44,7 @@ var cronexpr = ""; var intervalexpr = ""; var atexpr = ""; + var i = 0; while( ! done ) { var jobnames = scheduler.getJobNames(); Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/docs.xml ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/docs.xml (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/cron/samples/docs.xml Mon Nov 8 05:16:55 2004 @@ -26,10 +26,7 @@ <code>JobScheduler</code></link> is based on the <link href="http://quartz.sf.net">Quartz</link> job scheduling project and the - <link href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html"> - <code>PooledExecutor</code></link> of - <link href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> - Doug Leas Concurrency Package</link> as a ThreadPool implementation for the Quartz Scheduler. + <code>RunnableManager</code></link> as a ThreadPool implementation for the Quartz Scheduler. </p> <p style="background-color: yellow"> @@ -113,9 +110,8 @@ </thread-pool> </source> <p> - As mentioned in the beginning, more information about the thread pool details of the base - <link href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html"> - <code>PooledExecutor</code></link> class can be found there. + As mentioned in the beginning, more information about the thread pool details base on the + <code>RunnableManager#createPool(...)</code></link> method can be found there. </p> </section> Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/conf/hsqldb.xconf ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/conf/hsqldb.xconf (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/conf/hsqldb.xconf Mon Nov 8 05:16:55 2004 @@ -30,6 +30,7 @@ <parameter name="port" value="9002"/> <parameter name="silent" value="true"/> <parameter name="trace" value="false"/> + <parameter name="thread-pool-name" value="daemon"/> </hsqldb-server> </xconf> Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/java/org/apache/cocoon/components/hsqldb/ServerImpl.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/java/org/apache/cocoon/components/hsqldb/ServerImpl.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/hsqldb/java/org/apache/cocoon/components/hsqldb/ServerImpl.java Mon Nov 8 05:16:55 2004 @@ -23,6 +23,7 @@ import java.sql.SQLException; import java.sql.Statement; +import org.apache.avalon.framework.CascadingRuntimeException; import org.apache.avalon.framework.activity.Startable; import org.apache.avalon.framework.context.Context; import org.apache.avalon.framework.context.ContextException; @@ -30,16 +31,21 @@ import org.apache.avalon.framework.logger.AbstractLogEnabled; import org.apache.avalon.framework.parameters.Parameterizable; import org.apache.avalon.framework.parameters.Parameters; +import org.apache.avalon.framework.service.ServiceException; +import org.apache.avalon.framework.service.ServiceManager; +import org.apache.avalon.framework.service.Serviceable; import org.apache.avalon.framework.thread.ThreadSafe; import org.apache.cocoon.Constants; +import org.apache.cocoon.components.thread.RunnableManager; +import org.apache.cocoon.components.thread.ThreadPool; /** * This class runs an instance of HSQLDB Server. * * @author <a href="mailto:[EMAIL PROTECTED]">Davanum Srinivas</a> * @author <a href="mailto:[EMAIL PROTECTED]">Vadim Gritsenko</a> - * @version CVS $Id: ServerImpl.java,v 1.3 2004/03/05 13:01:56 bdelacretaz Exp $ + * @version CVS $Id$ */ public class ServerImpl extends AbstractLogEnabled implements Server, @@ -47,6 +53,7 @@ Contextualizable, ThreadSafe, Runnable, + Serviceable, Startable { /** Port which HSQLDB server will listen to */ @@ -55,9 +62,15 @@ /** Arguments for running the server */ private String arguments[] = new String[10]; + /** The threadpool name to be used for daemon thread */ + private String m_daemonThreadPoolName = "daemon"; + /** Check if the server has already been started */ private boolean started = false; + /** The [EMAIL PROTECTED] ServiceManager} instance */ + private ServiceManager m_serviceManager; + /** * Initialize the ServerImpl. * A few options can be used : @@ -83,6 +96,7 @@ + ", silent: " + arguments[3] + ", trace: " +arguments[5]); } + m_daemonThreadPoolName = params.getParameter( "thread-pool-name", m_daemonThreadPoolName ); } /** Contextualize this class */ @@ -108,6 +122,15 @@ getLogger().error("IOException - Could not get database directory ", e); } } + /** + * @param serviceManager The <@link ServiceManager} instance + * @throws ServiceException In case we cannot find a service needed + */ + public void service( ServiceManager serviceManager ) + throws ServiceException + { + m_serviceManager = serviceManager; + } /** Start the server */ public void start() { @@ -118,12 +141,24 @@ getLogger().info("HSQLDB backup file has been deleted."); } - Thread server = new Thread(this); + RunnableManager runnableManager = null; + try + { this.getLogger().debug("Intializing hsqldb server thread"); - server.setPriority(Thread.currentThread().getPriority()); - server.setDaemon(true); - server.setName("hsqldb server"); - server.start(); + runnableManager = (RunnableManager)m_serviceManager.lookup(RunnableManager.ROLE); + runnableManager.execute( m_daemonThreadPoolName, this ); + } + catch( final ServiceException se ) + { + throw new CascadingRuntimeException( "Cannot get RunnableManager", se ); + } + finally + { + if( null != runnableManager ) + { + m_serviceManager.release( runnableManager ); + } + } } } Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/portal/java/org/apache/cocoon/portal/coplet/adapter/impl/AbstractCopletAdapter.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/portal/java/org/apache/cocoon/portal/coplet/adapter/impl/AbstractCopletAdapter.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/portal/java/org/apache/cocoon/portal/coplet/adapter/impl/AbstractCopletAdapter.java Mon Nov 8 05:16:55 2004 @@ -23,6 +23,7 @@ import org.apache.avalon.framework.service.ServiceManager; import org.apache.avalon.framework.service.Serviceable; import org.apache.avalon.framework.thread.ThreadSafe; +import org.apache.cocoon.components.thread.RunnableManager; import org.apache.cocoon.portal.coplet.CopletData; import org.apache.cocoon.portal.coplet.CopletInstanceData; import org.apache.cocoon.portal.coplet.adapter.CopletAdapter; @@ -30,6 +31,7 @@ import org.apache.cocoon.xml.XMLUtils; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; +import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * This is the abstract base adapter to use pipelines as coplets @@ -60,7 +62,7 @@ * @author <a href="mailto:[EMAIL PROTECTED]">Carsten Ziegeler</a> * @author <a href="mailto:[EMAIL PROTECTED]">Volker Schmitt</a> * - * @version CVS $Id: AbstractCopletAdapter.java,v 1.11 2004/04/25 20:09:34 haul Exp $ + * @version CVS $Id$ */ public abstract class AbstractCopletAdapter extends AbstractLogEnabled @@ -115,14 +117,13 @@ if ( timeout != null ) { final int milli = timeout.intValue() * 1000; LoaderThread loader = new LoaderThread(this, coplet, buffer); - Thread thread = new Thread(loader); - thread.start(); + final RunnableManager runnableManager = (RunnableManager)this.manager.lookup( RunnableManager.ROLE ); + runnableManager.execute( loader ); + this.manager.release( runnableManager ); try { - thread.join(milli); + read = loader.join( milli ); } catch (InterruptedException ignore) { - } - if ( loader.finished ) { - read = true; + // ignored } } else { this.streamContent( coplet, buffer ); @@ -202,10 +203,10 @@ final class LoaderThread implements Runnable { - private AbstractCopletAdapter adapter; - private ContentHandler handler; - private CopletInstanceData coplet; - boolean finished; + private final AbstractCopletAdapter adapter; + private final ContentHandler handler; + private final CopletInstanceData coplet; + private final CountDown finished; Exception exception; public LoaderThread(AbstractCopletAdapter adapter, @@ -214,6 +215,7 @@ this.adapter = adapter; this.coplet = coplet; this.handler = handler; + this.finished = new CountDown( 1 ); } public void run() { @@ -222,8 +224,14 @@ } catch (Exception local) { this.exception = local; } finally { - this.finished = true; + this.finished.release(); } + } + + boolean join( final long milis ) + throws InterruptedException + { + return this.finished.attempt( milis ); } } Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java Mon Nov 8 05:16:55 2004 @@ -15,6 +15,7 @@ */ package org.apache.cocoon.transformation; +import org.apache.avalon.framework.CascadingRuntimeException; import org.apache.avalon.framework.configuration.Configurable; import org.apache.avalon.framework.configuration.Configuration; import org.apache.avalon.framework.configuration.ConfigurationException; @@ -27,6 +28,7 @@ import org.apache.cocoon.caching.CacheableProcessingComponent; import org.apache.cocoon.components.source.SourceUtil; import org.apache.cocoon.components.source.impl.MultiSourceValidity; +import org.apache.cocoon.components.thread.RunnableManager; import org.apache.cocoon.environment.SourceResolver; import org.apache.cocoon.util.NetUtils; import org.apache.cocoon.xml.EmbeddedXMLPipe; @@ -618,13 +620,15 @@ public IncludeBuffer(Source source) { this.source = source; - // FIXME Need thread pool component. Based on EDU.oswego.cs.dl.util.concurrent.PooledExecutor. - // See also org.apache.cocoon.components.cron.QuartzJobScheduler.ThreadPool try { - Thread t = new Thread(this); - t.setName("IncludeSource#" + source.getURI()); - t.setDaemon(true); - t.start(); + final RunnableManager runnableManager = (RunnableManager)m_manager.lookup( RunnableManager.ROLE ); + runnableManager.execute( "daemon", this ); // XXX: GP: Do we really need daemon threads here ? + m_manager.release( runnableManager ); + } catch (final ServiceException e) { + // In case we failed to spawn a thread + this.e = new SAXException(e); + m_resolver.release(source); + throw new CascadingRuntimeException( e.getMessage(), e ); } catch (RuntimeException e) { // In case we failed to spawn a thread this.e = new SAXException(e); Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/JavaLanguage.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/JavaLanguage.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/JavaLanguage.java Mon Nov 8 05:16:55 2004 @@ -43,7 +43,7 @@ * The Java programming language processor * * @author <a href="mailto:[EMAIL PROTECTED]">Ricardo Rocha</a> - * @version CVS $Id: JavaLanguage.java,v 1.1 2004/03/10 12:58:07 stephan Exp $ + * @version CVS $Id$ */ public class JavaLanguage extends CompiledProgrammingLanguage implements Initializable, ThreadSafe, Serviceable, Disposable { @@ -174,6 +174,9 @@ if (compiler instanceof LogEnabled) { ((LogEnabled)compiler).enableLogging(getLogger()); } + if (compiler instanceof Serviceable) { + ((Serviceable)compiler).service(this.manager); + } int pos = name.lastIndexOf(File.separatorChar); String filename = name.substring(pos + 1); @@ -213,6 +216,9 @@ } catch (IOException e) { getLogger().warn("Error during compilation", e); throw new LanguageException("Error during compilation: " + e.getMessage()); + } catch (ServiceException e) { + getLogger().warn("Could not initialize the compiler", e); + throw new LanguageException("Could not initialize the compiler: " + e.getMessage()); } } Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/Jikes.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/Jikes.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/blocks/xsp/java/org/apache/cocoon/components/language/programming/java/Jikes.java Mon Nov 8 05:16:55 2004 @@ -25,31 +25,40 @@ import java.util.List; import java.util.StringTokenizer; +import org.apache.avalon.framework.service.ServiceException; +import org.apache.avalon.framework.service.ServiceManager; +import org.apache.avalon.framework.service.Serviceable; import org.apache.cocoon.components.language.programming.CompilerError; +import org.apache.cocoon.components.thread.RunnableManager; +import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * This class wraps IBM's <i>Jikes</i> Java compiler * NOTE: inspired by the Apache Jasper implementation. * @author <a href="mailto:[EMAIL PROTECTED]">Stefano Mazzocchi</a> - * @version CVS $Id: Jikes.java,v 1.1 2004/03/10 12:58:07 stephan Exp $ + * @version CVS $Id$ * @since 2.0 */ -public class Jikes extends AbstractJavaCompiler { +public class Jikes extends AbstractJavaCompiler implements Serviceable { static final int OUTPUT_BUFFER_SIZE = 1024; static final int BUFFER_SIZE = 512; - private class StreamPumper extends Thread { + private ServiceManager m_serviceManager; + + private class StreamPumper implements Runnable { private BufferedInputStream stream; private boolean endOfStream = false; private int SLEEP_TIME = 5; private OutputStream out; + private CountDown m_done; - public StreamPumper(BufferedInputStream is, OutputStream out) { + public StreamPumper(BufferedInputStream is, OutputStream out, CountDown done) { this.stream = is; this.out = out; + m_done = done; } public void pumpStream() throws IOException { @@ -69,12 +78,22 @@ try { while (!endOfStream) { pumpStream(); - sleep(SLEEP_TIME); + Thread.sleep(SLEEP_TIME); } } catch (Exception e) { // getLogger().warn("Jikes.run()", e); } + m_done.release(); // signal 'we are finished' } + } + + /** + * Set the [EMAIL PROTECTED] ServiceManager} + */ + public void service( ServiceManager serviceManager ) + throws ServiceException + { + m_serviceManager = serviceManager; } /** @@ -128,15 +147,26 @@ BufferedInputStream compilerErr = new BufferedInputStream(p.getErrorStream()); - StreamPumper errPumper = new StreamPumper(compilerErr, tmpErr); + RunnableManager runnableManager = null; + try + { + runnableManager = (RunnableManager)m_serviceManager.lookup( RunnableManager.ROLE ); + } + catch( final ServiceException se ) + { + getLogger().error( "Cannot get RunnableManager", se ); + throw new IOException( "Cannot get RunnableManager" ); + } - errPumper.start(); + final CountDown done = new CountDown( 1 ); + StreamPumper errPumper = new StreamPumper(compilerErr, tmpErr, done); + runnableManager.execute( errPumper ); + m_serviceManager.release( runnableManager ); p.waitFor(); exitValue = p.exitValue(); - // Wait until the complete error stream has been read - errPumper.join(); + done.acquire(); // Wait for StreadmPumper to finish compilerErr.close(); p.destroy(); Modified: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/cocoon.roles ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/cocoon.roles (original) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/cocoon.roles Mon Nov 8 05:16:55 2004 @@ -172,4 +172,9 @@ shorthand="request-data-store" default-class="org.apache.cocoon.components.persistence.RequestDataStoreImpl"/> + <!-- Running commands (Runnable) in background --> + <role name="org.apache.cocoon.components.thread.RunnableManager" + shorthand="runnable-manager" + default-class="org.apache.cocoon.components.thread.DefaultRunnableManager"/> + </role-list> Modified: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java Mon Nov 8 05:16:55 2004 @@ -22,7 +22,11 @@ import org.apache.avalon.framework.context.ContextException; import org.apache.avalon.framework.context.Contextualizable; import org.apache.avalon.framework.logger.AbstractLogEnabled; +import org.apache.avalon.framework.service.ServiceException; +import org.apache.avalon.framework.service.ServiceManager; +import org.apache.avalon.framework.service.Serviceable; import org.apache.avalon.framework.thread.ThreadSafe; +import org.apache.cocoon.components.thread.RunnableManager; import org.apache.excalibur.event.Queue; import org.apache.excalibur.event.Sink; @@ -55,7 +59,7 @@ public class ContinuationsManagerImpl extends AbstractLogEnabled implements ContinuationsManager, Component, Configurable, - ThreadSafe, Contextualizable, Instrumentable { + ThreadSafe, Instrumentable, Serviceable { static final int CONTINUATION_ID_LENGTH = 20; static final String EXPIRE_CONTINUATIONS = "expire-continuations"; @@ -66,8 +70,6 @@ protected SecureRandom random; protected byte[] bytes; - protected Sink m_commandSink; - /** * How long does a continuation exist in memory since the last * access? The time is in miliseconds, and the default is 1 hour. @@ -101,6 +103,7 @@ private CounterInstrument continuationsCreated; private CounterInstrument continuationsInvalidated; + private ServiceManager serviceManager; public ContinuationsManagerImpl() throws Exception { try { @@ -122,17 +125,25 @@ /** * Get the command sink so that we can be notified of changes */ - public void contextualize(Context context) throws ContextException { - m_commandSink = (Sink) context.get(Queue.ROLE); + public void service(final ServiceManager manager) throws ServiceException { + this.serviceManager = manager; } public void configure(Configuration config) { this.defaultTimeToLive = config.getAttributeAsInteger("time-to-live", (3600 * 1000)); final Configuration expireConf = config.getChild("expirations-check"); + final long initialDelay = expireConf.getChild("offset", true).getValueAsLong(180000); + final long interval = expireConf.getChild("period", true).getValueAsLong(180000); try { - final ContinuationInterrupt interrupt = new ContinuationInterrupt(expireConf); - this.m_commandSink.enqueue(interrupt); + final RunnableManager runnableManager = (RunnableManager)serviceManager.lookup(RunnableManager.ROLE); + runnableManager.execute( new Runnable() { + public void run() + { + expireContinuations(); + } + }, initialDelay, interval); + serviceManager.release(runnableManager); } catch (Exception e) { getLogger().warn("Could not enqueue continuations expiration task. " + "Continuations will not automatically expire.", e); @@ -363,7 +374,7 @@ /** * Remove all continuations which have already expired. */ - private void expireContinuations() { + protected void expireContinuations() { long now = 0; if (getLogger().isDebugEnabled()) { now = System.currentTimeMillis(); @@ -395,51 +406,6 @@ displayAllContinuations(); displayExpireSet(); */ - } - } - - - final class ContinuationInterrupt implements RepeatedCommand { - private final long m_interval; - private final long m_initialDelay; - - /** - * @param expireConf - */ - public ContinuationInterrupt(Configuration expireConf) { - // only periodic time triggers are supported - m_initialDelay = - expireConf.getChild("offset", true).getValueAsLong(100); - m_interval = - expireConf.getChild("period", true).getValueAsLong(100); - } - - /** - * Repeat forever - */ - public int getNumberOfRepeats() { - return -1; - } - - /** - * Get the number of millis to wait between invocations - */ - public long getRepeatInterval() { - return m_interval; - } - - /** - * Get the number of millis to wait for the first invocation - */ - public long getDelayInterval() { - return m_initialDelay; - } - - /** - * expire any continuations that need expiring. - */ - public void execute() throws Exception { - expireContinuations(); } } } Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/BoundedQueue.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/BoundedQueue.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,68 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * Efficient array-based bounded buffer class. Adapted from CPJ, chapter 8, + * which describes design. + * + * <p> + * [<a + * href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> + * Introduction to this package. </a>] + * </p> + * + * <p></p> + */ +public class BoundedQueue + extends EDU.oswego.cs.dl.util.concurrent.BoundedBuffer + implements Queue +{ + //~ Constructors ----------------------------------------------------------- + + /** + * Create a buffer with the current default capacity + */ + public BoundedQueue( ) + { + super( ); + } + + /** + * Create a BoundedQueue with the given capacity. + * + * @param capacity The capacity + * + * @exception IllegalArgumentException if capacity less or equal to zero + */ + public BoundedQueue( int capacity ) + throws IllegalArgumentException + { + super( capacity ); + } + + //~ Methods ---------------------------------------------------------------- + + /** + * DOCUMENT ME! + * + * @return + */ + public int getQueueSize( ) + { + return usedSlots_; + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ChannelWrapper.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ChannelWrapper.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,92 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +import EDU.oswego.cs.dl.util.concurrent.Channel; + + +/** + * Wrapper around a Channel implementation for constructor convenience + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version $Id: ChannelWrapper.java 56702 2004-11-05 22:52:05Z giacomo $ + */ +public class ChannelWrapper + implements Channel +{ + //~ Instance fields -------------------------------------------------------- + + /** The wrapped Channel */ + private Channel m_channel; + + //~ Methods ---------------------------------------------------------------- + + /** + * DOCUMENT ME! + * + * @param channel DOCUMENT ME! + */ + public void setChannel( final Channel channel ) + { + m_channel = channel; + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.Puttable#offer(java.lang.Object, + * long) + */ + public boolean offer( final Object obj, + final long timeout ) + throws InterruptedException + { + return m_channel.offer( obj, timeout ); + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.Channel#peek() + */ + public Object peek( ) + { + return m_channel.peek( ); + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.Takable#poll(long) + */ + public Object poll( final long timeout ) + throws InterruptedException + { + return m_channel.poll( timeout ); + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.Puttable#put(java.lang.Object) + */ + public void put( final Object obj ) + throws InterruptedException + { + m_channel.put( obj ); + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.Takable#take() + */ + public Object take( ) + throws InterruptedException + { + return m_channel.take( ); + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,859 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +import org.apache.avalon.framework.activity.Disposable; +import org.apache.avalon.framework.activity.Startable; +import org.apache.avalon.framework.configuration.Configurable; +import org.apache.avalon.framework.configuration.Configuration; +import org.apache.avalon.framework.configuration.ConfigurationException; +import org.apache.avalon.framework.logger.AbstractLogEnabled; +import org.apache.avalon.framework.logger.Logger; +import org.apache.avalon.framework.thread.ThreadSafe; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + + +/** + * The DefaultRunnableManager implements the [EMAIL PROTECTED] RunnableManager} interface + * and is responsible to create [EMAIL PROTECTED] ThreadPool}s and run [EMAIL PROTECTED] Runnable}s + * in them as background commands. + * + * <p> + * The configuration of the <code>DefaultRunnableManager</code>: + * <pre> + * <thread-factory>org.apache.cocoon.components.thread.DefaultThreadFactory</thread-factory> + * <thread-pools> + * <thread-pool> + * <name>default</name> + * <priority>NORM</priority> + * <daemon>false</daemon> + * <queue-size>-1</queue-size> + * <max-pool-size>-1</max-pool-size> + * <min-pool-size>2</min-pool-size> + * <keep-alive-time-ms>20000</keep-alive-time-ms> + * <block-policy>RUN</block-policy> + * <shutdown-graceful>false</shutdown-graceful> + * <shutdown-wait-time-ms>-1</shutdown-wait-time-ms> + * </thread-pool> + * </thread-pools> + * </pre> + * </p> + * + * <p> + * Have a look at + * http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html, + * [EMAIL PROTECTED] EDU.oswego.cs.dl.util.concurrent.PooledExecutor} or the cocoon.xconf + * file for more information. + * </p> + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version $Id: DefaultRunnableManager.java 56848 2004-11-07 14:09:23Z giacomo $ + */ +public class DefaultRunnableManager + extends AbstractLogEnabled + implements RunnableManager, Configurable, Disposable, Runnable, Startable, + ThreadSafe +{ + //~ Static fields/initializers --------------------------------------------- + + /** The default [EMAIL PROTECTED] ThreadFactory} */ + public static final String DEFAULT_THREAD_FACTORY = + DefaultThreadFactory.class.getName( ); + + /** The default queue size */ + public static final int DEFAULT_QUEUE_SIZE = -1; + + /** The default maximum pool size */ + public static final int DEFAULT_MAX_POOL_SIZE = 5; + + /** The default minimum pool size */ + public static final int DEFAULT_MIN_POOL_SIZE = 5; + + /** The default thread priority */ + public static final String DEFAULT_THREAD_PRIORITY = "NORM"; + + /** The default daemon mode */ + public static final boolean DEFAULT_DAEMON_MODE = false; + + /** The default keep alive time */ + public static final long DEFAULT_KEEP_ALIVE_TIME = 60000L; + + /** The default way to shutdown gracefully */ + public static final boolean DEFAULT_SHUTDOWN_GRACEFUL = false; + + /** The default shutdown waittime time */ + public static final int DEFAULT_SHUTDOWN_WAIT_TIME = -1; + + /** The default shutdown waittime time */ + public static final String DEFAULT_THREADPOOL_NAME = "default"; + + //~ Instance fields -------------------------------------------------------- + + /** + * Sorted set of <code>ExecutionInfo</code> instances, based on their next + * execution time. + */ + protected SortedSet m_commandStack = new TreeSet( ); + + /** The managed thread pools */ + final Map m_pools = new HashMap( ); + + /** The configured default ThreadFactory class instance */ + private Class m_defaultThreadFactoryClass; + + /** Keep us running? */ + private boolean m_keepRunning = false; + + //~ Methods ---------------------------------------------------------------- + + /** + * @see org.apache.avalon.framework.configuration.Configurable#configure(org.apache.avalon.framework.configuration.Configuration) + */ + public void configure( final Configuration config ) + throws ConfigurationException + { + final String defaultThreadFactoryName = + config.getChild( "thread-factory" ).getValue( DEFAULT_THREAD_FACTORY ); + + try + { + m_defaultThreadFactoryClass = + Thread.currentThread( ).getContextClassLoader( ).loadClass( defaultThreadFactoryName ); + } + catch( final Exception ex ) + { + throw new ConfigurationException( "Cannot create instance of default thread factory " + + defaultThreadFactoryName, ex ); + } + + final Configuration [] threadpools = + config.getChild( "thread-pools" ).getChildren( "thread-pool" ); + + for( int i = 0; i < threadpools.length; i++ ) + { + final DefaultThreadPool pool = configThreadPool( threadpools[ i ] ); + } + + // Check if a "default" pool has been created + final ThreadPool defaultThreadPool = + (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ); + + if( null == defaultThreadPool ) + { + createPool( DEFAULT_THREADPOOL_NAME, DEFAULT_QUEUE_SIZE, + DEFAULT_MAX_POOL_SIZE, DEFAULT_MIN_POOL_SIZE, + getPriority( DEFAULT_THREAD_PRIORITY ), + DEFAULT_DAEMON_MODE, DEFAULT_KEEP_ALIVE_TIME, + DefaultThreadPool.POLICY_DEFAULT, + DEFAULT_SHUTDOWN_GRACEFUL, DEFAULT_SHUTDOWN_WAIT_TIME ); + } + } + + /** + * Create a shared ThreadPool + * + * @param name The name of the thread pool + * @param queueSize The size of the queue + * @param maxPoolSize The maximum number of threads + * @param minPoolSize The maximum number of threads + * @param priority The priority of threads created by this pool. This is + * one of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, or [EMAIL PROTECTED] Thread#MAX_PRIORITY} + * @param isDaemon Whether or not thread from the pool should run in daemon + * mode + * @param keepAliveTime How long should a thread be alive for new work to + * be done before it is GCed + * @param blockPolicy What's the blocking policy is resources are exhausted + * @param shutdownGraceful Should we wait for the queue to finish all + * pending commands? + * @param shutdownWaitTime After what time a normal shutdown should take + * into account if a graceful shutdown has not come to an end + * + * @throws IllegalArgumentException If the pool already exists + */ + public void createPool( final String name, + final int queueSize, + final int maxPoolSize, + final int minPoolSize, + final int priority, + final boolean isDaemon, + final long keepAliveTime, + final String blockPolicy, + final boolean shutdownGraceful, + final int shutdownWaitTime ) + { + if( null != m_pools.get( name ) ) + { + throw new IllegalArgumentException( "ThreadPool \"" + name + + "\" already exists" ); + } + + createPool( new DefaultThreadPool( ), name, queueSize, maxPoolSize, + minPoolSize, priority, isDaemon, keepAliveTime, + blockPolicy, shutdownGraceful, shutdownWaitTime ); + } + + /** + * Create a private ThreadPool + * + * @param queueSize The size of the queue + * @param maxPoolSize The maximum number of threads + * @param minPoolSize The maximum number of threads + * @param priority The priority of threads created by this pool. This is + * one of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, or [EMAIL PROTECTED] Thread#MAX_PRIORITY} + * @param isDaemon Whether or not thread from the pool should run in daemon + * mode + * @param keepAliveTime How long should a thread be alive for new work to + * be done before it is GCed + * @param blockPolicy What's the blocking policy is resources are exhausted + * @param shutdownGraceful Should we wait for the queue to finish all + * pending commands? + * @param shutdownWaitTime After what time a normal shutdown should take + * into account if a graceful shutdown has not come to an end + * + * @return A newly created <code>ThreadPool</code> + */ + public ThreadPool createPool( final int queueSize, + final int maxPoolSize, + final int minPoolSize, + final int priority, + final boolean isDaemon, + final long keepAliveTime, + final String blockPolicy, + final boolean shutdownGraceful, + final int shutdownWaitTime ) + { + final DefaultThreadPool pool = new DefaultThreadPool( ); + final String name = "anon-" + pool.hashCode( ); + + return createPool( pool, name, queueSize, maxPoolSize, minPoolSize, + priority, isDaemon, keepAliveTime, blockPolicy, + shutdownGraceful, shutdownWaitTime ); + } + + /** + * @see org.apache.avalon.framework.activity.Disposable#dispose() + */ + public void dispose( ) + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Disposing all thread pools" ); + } + + for( final Iterator i = m_pools.keySet( ).iterator( ); i.hasNext( ); ) + { + final String poolName = (String)i.next( ); + final DefaultThreadPool pool = + (DefaultThreadPool)m_pools.get( poolName ); + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Disposing thread pool " + + pool.getName( ) ); + } + + pool.shutdown( ); + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Thread pool " + pool.getName( ) + + " disposed" ); + } + } + + try + { + m_pools.clear( ); + } + catch( final Throwable t ) + { + getLogger( ).error( "Cannot dispose", t ); + } + } + + /** + * Run a [EMAIL PROTECTED] Runnable} in the background using a [EMAIL PROTECTED] ThreadPool} + * + * @param threadPoolName The thread pool name to be used + * @param command The [EMAIL PROTECTED] Runnable} to execute + * @param delay the delay befor first run + * @param interval The interval for repeated runs + * + * @throws IllegalArgumentException DOCUMENT ME! + */ + public void execute( final String threadPoolName, + final Runnable command, + final long delay, + long interval ) + { + if( delay < 0 ) + { + throw new IllegalArgumentException( "delay < 0" ); + } + + if( interval < 0 ) + { + throw new IllegalArgumentException( "interval < 0" ); + } + + ThreadPool pool = (ThreadPool)m_pools.get( threadPoolName ); + + if( null == pool ) + { + getLogger( ).warn( "ThreadPool \"" + threadPoolName + + "\" is not known. Will use ThreadPool \"" + + DEFAULT_THREADPOOL_NAME + "\"" ); + pool = (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ); + } + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Command entered: " + command.toString( ) + + ",pool=" + pool.getName( ) + ",delay=" + + delay + ",interval=" + interval ); + } + + new ExecutionInfo( pool, command, delay, interval, getLogger( ) ); + } + + /** + * Run a [EMAIL PROTECTED] Runnable} in the background using a [EMAIL PROTECTED] ThreadPool} + * + * @param command The [EMAIL PROTECTED] Runnable} to execute + * @param delay the delay befor first run + * @param interval The interval for repeated runs + */ + public void execute( final Runnable command, + final long delay, + final long interval ) + { + execute( DEFAULT_THREADPOOL_NAME, command, delay, interval ); + } + + /** + * Run a [EMAIL PROTECTED] Runnable} in the background using a [EMAIL PROTECTED] ThreadPool} + * + * @param command The [EMAIL PROTECTED] Runnable} to execute + * @param delay the delay befor first run + */ + public void execute( final Runnable command, + final long delay ) + { + execute( DEFAULT_THREADPOOL_NAME, command, delay, 0 ); + } + + /** + * Run a [EMAIL PROTECTED] Runnable} in the background using a [EMAIL PROTECTED] ThreadPool} + * + * @param command The [EMAIL PROTECTED] Runnable} to execute + */ + public void execute( final Runnable command ) + { + execute( DEFAULT_THREADPOOL_NAME, command, 0, 0 ); + } + + /** + * Run a [EMAIL PROTECTED] Runnable} in the background using a [EMAIL PROTECTED] ThreadPool} + * + * @param threadPoolName The thread pool name to be used + * @param command The [EMAIL PROTECTED] Runnable} to execute + * @param delay the delay befor first run + */ + public void execute( final String threadPoolName, + final Runnable command, + final long delay ) + { + execute( threadPoolName, command, delay, 0 ); + } + + /** + * Run a [EMAIL PROTECTED] Runnable} in the background using a [EMAIL PROTECTED] ThreadPool} + * + * @param threadPoolName The thread pool name to be used + * @param command The [EMAIL PROTECTED] Runnable} to execute + */ + public void execute( final String threadPoolName, + final Runnable command ) + { + execute( threadPoolName, command, 0, 0 ); + } + + /** + * Remove a <code>Runnable</code> from the command stack + * + * @param command The <code>Runnable</code> to be removed + */ + public void remove( Runnable command ) + { + synchronized( m_commandStack ) + { + for( final Iterator i = m_commandStack.iterator( ); i.hasNext( ); ) + { + final ExecutionInfo info = (ExecutionInfo)i.next( ); + + if( info.m_command == command ) + { + i.remove( ); + m_commandStack.notifyAll( ); + + return; + } + } + } + + getLogger( ).warn( "Could not find command " + command + + " for removal" ); + } + + /** + * The heart of the command manager + */ + public void run( ) + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Entering loop" ); + } + + while( m_keepRunning ) + { + synchronized( m_commandStack ) + { + try + { + if( m_commandStack.size( ) > 0 ) + { + final ExecutionInfo info = + (ExecutionInfo)m_commandStack.first( ); + final long delay = + info.m_nextRun - System.currentTimeMillis( ); + + if( delay > 0 ) + { + m_commandStack.wait( delay ); + } + } + else + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "No commands available. Will just wait for one" ); + } + + m_commandStack.wait( ); + } + } + catch( final InterruptedException ie ) + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "I've been interrupted" ); + } + } + + if( m_keepRunning ) + { + if( m_commandStack.size( ) > 0 ) + { + final ExecutionInfo info = + (ExecutionInfo)m_commandStack.first( ); + final long delay = + info.m_nextRun - System.currentTimeMillis( ); + + if( delay < 0 ) + { + info.execute( ); + } + } + } + } + } + + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Exiting loop" ); + } + } + + /** + * Start the managing thread + * + * @throws Exception DOCUMENT ME! + */ + public void start( ) + throws Exception + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "starting heart" ); + } + + m_keepRunning = true; + ( (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ) ).execute( this ); + } + + /** + * Stop the managing thread + * + * @throws Exception DOCUMENT ME! + */ + public void stop( ) + throws Exception + { + m_keepRunning = false; + + synchronized( m_commandStack ) + { + m_commandStack.notifyAll( ); + } + } + + /** + * DOCUMENT ME! + * + * @param priority The priority to set as string value. + * + * @return The priority as int value. + */ + private int getPriority( final String priority ) + { + if( "MIN".equalsIgnoreCase( priority ) ) + { + return Thread.MIN_PRIORITY; + } + else if( "NORM".equalsIgnoreCase( priority ) ) + { + return Thread.NORM_PRIORITY; + } + else if( "MAX".equalsIgnoreCase( priority ) ) + { + return Thread.MAX_PRIORITY; + } + else + { + getLogger( ).warn( "Unknown thread priority \"" + priority + + "\". Set to \"NORM\"." ); + + return Thread.NORM_PRIORITY; + } + } + + /** + * DOCUMENT ME! + * + * @param config DOCUMENT ME! + * + * @return DOCUMENT ME! + * + * @throws ConfigurationException DOCUMENT ME! + */ + private DefaultThreadPool configThreadPool( final Configuration config ) + throws ConfigurationException + { + final String name = config.getChild( "name" ).getValue( ); + final int queueSize = + config.getChild( "queue-size" ).getValueAsInteger( DEFAULT_QUEUE_SIZE ); + final int maxPoolSize = + config.getChild( "max-pool-size" ).getValueAsInteger( DEFAULT_MAX_POOL_SIZE ); + int minPoolSize = + config.getChild( "min-pool-size" ).getValueAsInteger( DEFAULT_MIN_POOL_SIZE ); + + // make sure we have enough threads for the default thread pool as we + // need one for ourself + if( DEFAULT_THREADPOOL_NAME.equals( name ) && + ( ( minPoolSize > 0 ) && ( minPoolSize < DEFAULT_MIN_POOL_SIZE ) ) ) + { + minPoolSize = DEFAULT_MIN_POOL_SIZE; + } + + final String priority = + config.getChild( "priority" ).getValue( DEFAULT_THREAD_PRIORITY ); + final boolean isDaemon = + config.getChild( "daemon" ).getValueAsBoolean( DEFAULT_DAEMON_MODE ); + final long keepAliveTime = + config.getChild( "keep-alive-time-ms" ).getValueAsLong( DEFAULT_KEEP_ALIVE_TIME ); + final String blockPolicy = + config.getChild( "block-policy" ).getValue( DefaultThreadPool.POLICY_DEFAULT ); + final boolean shutdownGraceful = + config.getChild( "shutdown-graceful" ).getValueAsBoolean( DEFAULT_SHUTDOWN_GRACEFUL ); + final int shutdownWaitTime = + config.getChild( "shutdown-wait-time-ms" ).getValueAsInteger( DEFAULT_SHUTDOWN_WAIT_TIME ); + + return createPool( new DefaultThreadPool( ), name, queueSize, + maxPoolSize, minPoolSize, getPriority( priority ), + isDaemon, keepAliveTime, blockPolicy, + shutdownGraceful, shutdownWaitTime ); + } + + /** + * Create a ThreadPool + * + * @param pool DOCUMENT ME! + * @param name DOCUMENT ME! + * @param queueSize The size of the queue + * @param maxPoolSize The maximum number of threads + * @param minPoolSize The maximum number of threads + * @param priority The priority of threads created by this pool. This is + * one of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, or [EMAIL PROTECTED] Thread#MAX_PRIORITY} + * @param isDaemon Whether or not thread from the pool should run in daemon + * mode + * @param keepAliveTime How long should a thread be alive for new work to + * be done before it is GCed + * @param blockPolicy What's the blocking policy is resources are exhausted + * @param shutdownGraceful Should we wait for the queue to finish all + * pending commands? + * @param shutdownWaitTime After what time a normal shutdown should take + * into account if a graceful shutdown has not come to an end + * + * @return A newly created <code>ThreadPool</code> + */ + private DefaultThreadPool createPool( final DefaultThreadPool pool, + final String name, + final int queueSize, + final int maxPoolSize, + final int minPoolSize, + final int priority, + final boolean isDaemon, + final long keepAliveTime, + final String blockPolicy, + final boolean shutdownGraceful, + final int shutdownWaitTime ) + { + pool.enableLogging( getLogger( ).getChildLogger( name ) ); + pool.setName( name ); + + ThreadFactory factory = null; + + try + { + factory = + (ThreadFactory)m_defaultThreadFactoryClass.newInstance( ); + } + catch( final Exception ex ) + { + getLogger( ).warn( "Cannot instantiate a ThreadFactory from class " + + m_defaultThreadFactoryClass.getName( ) + + ". Will use a " + + DefaultThreadFactory.class.getName( ), ex ); + factory = new DefaultThreadFactory( ); + } + + factory.setPriority( priority ); + factory.setDaemon( isDaemon ); + pool.setThreadFactory( factory ); + pool.setQueue( queueSize ); + pool.setMaximumPoolSize( ( maxPoolSize < 0 ) ? Integer.MAX_VALUE + : maxPoolSize ); + + if( minPoolSize < 1 ) + { + getLogger( ).warn( "min-pool-size < 1 for pool \"" + + name + "\". Set to 1" ); + } + + pool.setMinimumPoolSize( ( minPoolSize < 1 ) ? 1 : minPoolSize ); + + if( keepAliveTime < 0 ) + { + getLogger( ).warn( "keep-alive-time-ms < 0 for pool \"" + + name + "\". Set to 1000" ); + } + + pool.setKeepAliveTime( ( keepAliveTime < 0 ) ? 1000 : keepAliveTime ); + pool.setBlockPolicy( blockPolicy ); + pool.setShutdownGraceful( shutdownGraceful ); + pool.setShutdownWaitTimeMs( shutdownWaitTime ); + + synchronized( m_pools ) + { + m_pools.put( name, pool ); + } + + printPoolInfo( pool ); + + return pool; + } + + /** + * DOCUMENT ME! + * + * @param pool DOCUMENT ME! + */ + private void printPoolInfo( final DefaultThreadPool pool ) + { + if( getLogger( ).isInfoEnabled( ) ) + { + if( pool.isQueued( ) ) + { + final StringBuffer msg = new StringBuffer( ); + msg.append( "ThreadPool named \"" ).append( pool.getName( ) ); + msg.append( "\" created with maximum queue-size=" ); + msg.append( pool.getMaxQueueSize( ) ); + msg.append( ",max-pool-size=" ).append( pool.getMaximumPoolSize( ) ); + msg.append( ",min-pool-size=" ).append( pool.getMinimumPoolSize( ) ); + msg.append( ",priority=" ).append( pool.getPriority( ) ); + msg.append( ",isDaemon=" ).append( ( (ThreadFactory)pool.getThreadFactory( ) ).isDaemon( ) ); + msg.append( ",keep-alive-time-ms=" ).append( pool.getKeepAliveTime( ) ); + msg.append( ",block-policy=\"" ).append( pool.getBlockPolicy( ) ); + msg.append( "\",shutdown-wait-time-ms=" ).append( pool.getShutdownWaitTimeMs( ) ); + getLogger( ).info( msg.toString( ) ); + } + else + { + final StringBuffer msg = new StringBuffer( ); + msg.append( "ThreadPool named \"" ).append( pool.getName( ) ); + msg.append( "\" created with no queue,max-pool-size=" ).append( pool.getMaximumPoolSize( ) ); + msg.append( ",min-pool-size=" ).append( pool.getMinimumPoolSize( ) ); + msg.append( ",priority=" ).append( pool.getPriority( ) ); + msg.append( ",isDaemon=" ).append( ( (ThreadFactory)pool.getThreadFactory( ) ).isDaemon( ) ); + msg.append( ",keep-alive-time-ms=" ).append( pool.getKeepAliveTime( ) ); + msg.append( ",block-policy=" ).append( pool.getBlockPolicy( ) ); + msg.append( ",shutdown-wait-time-ms=" ).append( pool.getShutdownWaitTimeMs( ) ); + getLogger( ).info( msg.toString( ) ); + } + } + } + + //~ Inner Classes ---------------------------------------------------------- + + /** + * The $classType$ class ... + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version $Id: DefaultRunnableManager.java 56848 2004-11-07 14:09:23Z giacomo $ + */ + private class ExecutionInfo + implements Comparable + { + //~ Instance fields ---------------------------------------------------- + + /** Our logger */ + final Logger m_logger; + + /** DOCUMENT ME! */ + final Runnable m_command; + + /** DOCUMENT ME! */ + final ThreadPool m_pool; + + /** DOCUMENT ME! */ + final long m_delay; + + /** DOCUMENT ME! */ + final long m_interval; + + /** DOCUMENT ME! */ + long m_nextRun = 0; + + //~ Constructors ------------------------------------------------------- + + /** + * Creates a new ExecutionInfo object. + * + * @param pool DOCUMENT ME! + * @param command DOCUMENT ME! + * @param delay DOCUMENT ME! + * @param interval DOCUMENT ME! + * @param logger DOCUMENT ME! + */ + ExecutionInfo( final ThreadPool pool, + final Runnable command, + final long delay, + final long interval, + final Logger logger ) + { + m_pool = pool; + m_command = command; + m_delay = delay; + m_interval = interval; + m_logger = logger; + m_nextRun = System.currentTimeMillis( ) + delay; + + synchronized( m_commandStack ) + { + m_commandStack.add( this ); + m_commandStack.notifyAll( ); + } + Thread.yield(); // Give others a chance to run + } + + //~ Methods ------------------------------------------------------------ + + /** + * DOCUMENT ME! + * + * @param other DOCUMENT ME! + * + * @return DOCUMENT ME! + */ + public int compareTo( final Object other ) + { + final ExecutionInfo otherInfo = (ExecutionInfo)other; + + return (int)( m_nextRun - otherInfo.m_nextRun ); + } + + /** + * DOCUMENT ME! + */ + void execute( ) + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Hand over Command " + + m_command.toString( ) + " to pool \"" + + m_pool.getName( ) + "\" with delay=" + m_delay + + " and interval=" + m_interval ); + } + + synchronized( m_commandStack ) + { + m_commandStack.remove( this ); + m_nextRun = ( ( m_interval > 0 ) + ? ( System.currentTimeMillis( ) + m_interval ) : 0 ); + + if( m_nextRun > 0 ) + { + m_commandStack.add( this ); + } + } + + try + { + m_pool.execute( m_command ); + } + catch( final InterruptedException ie ) + { + if( m_logger.isDebugEnabled( ) ) + { + m_logger.debug( m_command + " has been interrupted" ); + } + } + catch( final Throwable t ) + { + m_logger.error( "Exception thrown by Command " + m_command, t ); + } + } + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultThreadFactory.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultThreadFactory.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,100 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * This class is responsible to create new Thread instances to run a command. + * + * @author <a href="mailto:[EMAIL PROTECTED]">Otego AG, Switzerland</a> + * @version $Id: DefaultThreadFactory.java 56765 2004-11-06 13:54:31Z giacomo $ + */ +public class DefaultThreadFactory + implements ThreadFactory, EDU.oswego.cs.dl.util.concurrent.ThreadFactory +{ + //~ Instance fields -------------------------------------------------------- + + /** The daemon mode */ + private boolean m_isDaemon = false; + + /** The priority of newly created Threads */ + private int m_priority = Thread.NORM_PRIORITY; + + //~ Methods ---------------------------------------------------------------- + + /** + * Set the isDaemon property + * + * @param isDaemon Whether or not new <code>Thread</code> should run as + * daemons. + */ + public void setDaemon( boolean isDaemon ) + { + m_isDaemon = isDaemon; + } + + /** + * Get the isDaemon property + * + * @return Whether or not new <code>Thread</code> will run as daemons. + */ + public boolean isDaemon( ) + { + return m_isDaemon; + } + + /** + * Set the priority newly created <code>Thread</code>s should have + * + * @param priority One of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, [EMAIL PROTECTED] Thread#MAX_PRIORITY} + */ + public void setPriority( final int priority ) + { + if( ( Thread.MAX_PRIORITY == priority ) || + ( Thread.MIN_PRIORITY == priority ) || + ( Thread.NORM_PRIORITY == priority ) ) + { + m_priority = priority; + } + } + + /** + * Get the priority newly created <code>Thread</code>s will have + * + * @return One of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, [EMAIL PROTECTED] Thread#MAX_PRIORITY} + */ + public int getPriority( ) + { + return m_priority; + } + + /** + * Create a new Thread for Runnable + * + * @param command The [EMAIL PROTECTED] Runnable} + * + * @return A new Thread instance + */ + public Thread newThread( final Runnable command ) + { + final Thread thread = new Thread( command ); + thread.setPriority( m_priority ); + thread.setDaemon( m_isDaemon ); + + return thread; + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultThreadPool.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/DefaultThreadPool.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,378 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +import org.apache.avalon.framework.logger.LogEnabled; +import org.apache.avalon.framework.logger.Logger; + +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; + + +/** + * The DefaultThreadPool class implements the [EMAIL PROTECTED] ThreadPool} interface. + * Instances of this class are made by the [EMAIL PROTECTED] RunnableManager} passing a + * configuration into the <code>configure</code> method. + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version CVS $Id: DefaultThreadPool.java 56843 2004-11-07 13:34:30Z giacomo $ + */ +public class DefaultThreadPool + extends PooledExecutor + implements ThreadPool, LogEnabled +{ + //~ Static fields/initializers --------------------------------------------- + + /** Default ThreadPool block policy */ + public static final String POLICY_DEFAULT = POLICY_RUN; + + //~ Instance fields -------------------------------------------------------- + + /** Wrapps a channel */ + private ChannelWrapper m_channelWrapper; + + /** Our logger */ + private Logger m_logger; + + /** The Queue */ + private Queue m_queue; + + /** The blocking policy */ + private String m_blockPolicy; + + /** The name of this thread pool */ + private String m_name; + + /** Should we wait for running jobs to terminate on shutdown ? */ + private boolean m_shutdownGraceful; + + /** The priority of threads */ + private int m_priority; + + /** The maximum queue size */ + private int m_queueSize; + + /** How long to wait for running jobs to terminate on disposition */ + private int m_shutdownWaitTimeMs; + + //~ Constructors ----------------------------------------------------------- + + /** + * Create a new pool. + */ + DefaultThreadPool( ) + { + this( new ChannelWrapper( ) ); + } + + /** + * Create a new pool. + * + * @param channel DOCUMENT ME! + */ + private DefaultThreadPool( final ChannelWrapper channel ) + { + super( channel ); + m_channelWrapper = channel; + } + + //~ Methods ---------------------------------------------------------------- + + /** + * DOCUMENT ME! + * + * @return Returns the blockPolicy. + */ + public String getBlockPolicy( ) + { + return m_blockPolicy; + } + + /** + * DOCUMENT ME! + * + * @return maximum size of the queue (0 if isQueued() == false) + * + * @see org.apache.cocoon.components.thread.ThreadPool#getQueueSize() + */ + public int getMaxQueueSize( ) + { + return ( ( m_queueSize < 0 ) ? Integer.MAX_VALUE : m_queueSize ); + } + + /** + * DOCUMENT ME! + * + * @return size of queue (0 if isQueued() == false) + * + * @see org.apache.cocoon.components.thread.ThreadPool#getQueueSize() + */ + public int getMaximumQueueSize( ) + { + return m_queueSize; + } + + /** + * @see org.apache.cocoon.components.thread.ThreadPool#getName() + */ + public String getName( ) + { + return m_name; + } + + /** + * Get hte priority used to create Threads + * + * @return [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] Thread#NORM_PRIORITY}, or + * [EMAIL PROTECTED] Thread#MAX_PRIORITY} + */ + public int getPriority( ) + { + return m_priority; + } + + /** + * DOCUMENT ME! + * + * @return current size of the queue (0 if isQueued() == false) + * + * @see org.apache.cocoon.components.thread.ThreadPool#getQueueSize() + */ + public int getQueueSize( ) + { + return m_queue.getQueueSize( ); + } + + /** + * Whether this DefaultThreadPool has a queue + * + * @return Returns the m_isQueued. + * + * @see org.apache.cocoon.components.thread.ThreadPool#isQueued() + */ + public boolean isQueued( ) + { + return m_queueSize != 0; + } + + /** + * Set the logger + * + * @param logger + * + * @see org.apache.avalon.framework.logger.LogEnabled#enableLogging(org.apache.avalon.framework.logger.Logger) + */ + public void enableLogging( Logger logger ) + { + m_logger = logger; + } + + /** + * Execute a command + * + * @param command The [EMAIL PROTECTED] Runnable} to execute + * + * @throws InterruptedException In case of interruption + */ + public void execute( Runnable command ) + throws InterruptedException + { + if( getLogger( ).isDebugEnabled( ) ) + { + getLogger( ).debug( "Executing Command: " + command.toString( ) + + ",pool=" + getName( ) ); + } + + super.execute( command ); + } + + /** + * @see org.apache.cocoon.components.thread.ThreadPool#shutdownGraceful() + */ + public void shutdown( ) + { + if( m_shutdownGraceful ) + { + shutdownAfterProcessingCurrentlyQueuedTasks( ); + } + else + { + shutdownNow( ); + } + + try + { + if( getShutdownWaitTimeMs( ) > 0 ) + { + if( ! awaitTerminationAfterShutdown( getShutdownWaitTimeMs( ) ) ) + { + getLogger( ).warn( "running commands have not terminated within " + + getShutdownWaitTimeMs( ) + + "ms. Will shut them down by interruption" ); + interruptAll( ); + shutdownNow( ); + } + } + + awaitTerminationAfterShutdown( ); + } + catch( final InterruptedException ie ) + { + getLogger( ).error( "cannot shutdown ThreadPool", ie ); + } + } + + /** + * Set the blocking policy + * + * @param blockPolicy The blocking policy value + */ + void setBlockPolicy( final String blockPolicy ) + { + m_blockPolicy = blockPolicy; + + if( POLICY_ABORT.equalsIgnoreCase( blockPolicy ) ) + { + abortWhenBlocked( ); + } + else if( POLICY_DISCARD.equalsIgnoreCase( blockPolicy ) ) + { + discardWhenBlocked( ); + } + else if( POLICY_DISCARD_OLDEST.equalsIgnoreCase( blockPolicy ) ) + { + discardOldestWhenBlocked( ); + } + else if( POLICY_RUN.equalsIgnoreCase( blockPolicy ) ) + { + runWhenBlocked( ); + } + else if( POLICY_WAIT.equalsIgnoreCase( blockPolicy ) ) + { + waitWhenBlocked( ); + } + else + { + final StringBuffer msg = new StringBuffer( ); + msg.append( "WARNING: Unknown block-policy configuration \"" ) + .append( blockPolicy ); + msg.append( "\". Should be one of \"" ).append( POLICY_ABORT ); + msg.append( "\",\"" ).append( POLICY_DISCARD ); + msg.append( "\",\"" ).append( POLICY_DISCARD_OLDEST ); + msg.append( "\",\"" ).append( POLICY_RUN ); + msg.append( "\",\"" ).append( POLICY_WAIT ); + msg.append( "\". Will use \"" ).append( POLICY_DEFAULT ).append( "\"" ); + getLogger( ).warn( msg.toString( ) ); + setBlockPolicy( POLICY_DEFAULT ); + } + } + + /** + * DOCUMENT ME! + * + * @param name The name to set. + */ + void setName( String name ) + { + m_name = name; + } + + /** + * DOCUMENT ME! + * + * @param priority The priority to set. + */ + void setPriority( final int priority ) + { + m_priority = priority; + } + + /** + * DOCUMENT ME! + * + * @param queueSize DOCUMENT ME! + */ + void setQueue( final int queueSize ) + { + if( queueSize != 0 ) + { + if( queueSize > 0 ) + { + m_queue = new BoundedQueue( queueSize ); + } + else + { + m_queue = new LinkedQueue( ); + } + } + else + { + m_queue = new SynchronousChannel( ); + } + + m_queueSize = queueSize; + m_channelWrapper.setChannel( m_queue ); + } + + /** + * DOCUMENT ME! + * + * @param shutdownGraceful The shutdownGraceful to set. + */ + void setShutdownGraceful( boolean shutdownGraceful ) + { + m_shutdownGraceful = shutdownGraceful; + } + + /** + * DOCUMENT ME! + * + * @return Returns the shutdownGraceful. + */ + boolean isShutdownGraceful( ) + { + return m_shutdownGraceful; + } + + /** + * DOCUMENT ME! + * + * @param shutdownWaitTimeMs The shutdownWaitTimeMs to set. + */ + void setShutdownWaitTimeMs( int shutdownWaitTimeMs ) + { + m_shutdownWaitTimeMs = shutdownWaitTimeMs; + } + + /** + * DOCUMENT ME! + * + * @return Returns the shutdownWaitTimeMs. + */ + int getShutdownWaitTimeMs( ) + { + return m_shutdownWaitTimeMs; + } + + /** + * Get our <code>Logger</code> + * + * @return our <code>Logger</code> + */ + private Logger getLogger( ) + { + return m_logger; + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/LinkedQueue.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/LinkedQueue.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,74 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * A linked list based channel implementation. The algorithm avoids contention + * between puts and takes when the queue is not empty. Normally a put and a + * take can proceed simultaneously. (Although it does not allow multiple + * concurrent puts or takes.) This class tends to perform more efficently than + * other Queue implementations in producer/consumer applications. + * + * <p> + * [<a + * href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> + * Introduction to this package. </a>] + * </p> + */ +public class LinkedQueue + extends EDU.oswego.cs.dl.util.concurrent.LinkedQueue + implements Queue +{ + //~ Instance fields -------------------------------------------------------- + + /** The size */ + protected int m_size = 0; + + //~ Methods ---------------------------------------------------------------- + + /** + * @see org.apache.cocoon.components.thread.Queue#getQueueSize() + */ + public int getQueueSize( ) + { + return m_size; + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.LinkedQueue#extract() + */ + protected synchronized Object extract( ) + { + synchronized( head_ ) + { + if( head_.next != null ) + { + --m_size; + } + + return super.extract( ); + } + } + + /** + * @see EDU.oswego.cs.dl.util.concurrent.LinkedQueue#insert(java.lang.Object) + */ + protected void insert( final Object object ) + { + super.insert( object ); + ++m_size; + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/Queue.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/Queue.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,37 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * Extension to add queue size reporting + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version CVS $Id: Queue.java 56702 2004-11-05 22:52:05Z giacomo $ + * + * @see EDU.oswego.cs.dl.util.concurrent.Channel + */ +public interface Queue + extends EDU.oswego.cs.dl.util.concurrent.Channel +{ + //~ Methods ---------------------------------------------------------------- + + /** + * get the current queue size + * + * @return The current queue size + */ + int getQueueSize( ); +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/RunnableManager.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/RunnableManager.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,162 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * The RunnableManager interface describes the functionality of an + * implementation running commands in the background. + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version CVS $Id: RunnableManager.java 56786 2004-11-06 23:12:39Z giacomo $ + */ +public interface RunnableManager +{ + //~ Instance fields -------------------------------------------------------- + + /** The role name */ + String ROLE = RunnableManager.class.getName( ); + + //~ Methods ---------------------------------------------------------------- + + /** + * Create a shared ThreadPool with a specific [EMAIL PROTECTED] ThreadFactory} + * + * @param name The name of the thread pool + * @param queueSize The size of the queue + * @param maxPoolSize The maximum number of threads + * @param minPoolSize The maximum number of threads + * @param priority The priority of threads created by this pool. This is + * one of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, or [EMAIL PROTECTED] Thread#MAX_PRIORITY} + * @param isDaemon Whether or not thread from the pool should run in daemon + * mode + * @param keepAliveTime How long should a thread be alive for new work to + * be done before it is GCed + * @param blockPolicy What's the blocking policy is resources are exhausted + * @param shutdownGraceful Should we wait for the queue to finish all + * pending commands? + * @param shutdownWaitTime After what time a normal shutdown should take + * into account if a graceful shutdown has not come to an end + */ + void createPool( String name, + int queueSize, + int maxPoolSize, + int minPoolSize, + int priority, + final boolean isDaemon, + long keepAliveTime, + String blockPolicy, + boolean shutdownGraceful, + int shutdownWaitTime ); + + /** + * Create a private ThreadPool with a specific [EMAIL PROTECTED] ThreadFactory} + * + * @param queueSize The size of the queue + * @param maxPoolSize The maximum number of threads + * @param minPoolSize The maximum number of threads + * @param priority The priority of threads created by this pool. This is + * one of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, or [EMAIL PROTECTED] Thread#MAX_PRIORITY} + * @param isDaemon Whether or not thread from the pool should run in daemon + * mode + * @param keepAliveTime How long should a thread be alive for new work to + * be done before it is GCed + * @param blockPolicy What's the blocking policy is resources are exhausted + * @param shutdownGraceful Should we wait for the queue to finish all + * pending commands? + * @param shutdownWaitTime After what time a normal shutdown should take + * into account if a graceful shutdown has not come to an end + * + * @return The newly created <code>ThreadPool</code> + */ + ThreadPool createPool( int queueSize, + int maxPoolSize, + int minPoolSize, + int priority, + final boolean isDaemon, + long keepAliveTime, + String blockPolicy, + boolean shutdownGraceful, + int shutdownWaitTime ); + + /** + * Immediate Execution of a runnable in the background + * + * @param command The command to execute + */ + void execute( Runnable command ); + + /** + * Immediate Execution of a runnable in the background + * + * @param command The command to execute + * @param delay The delay before first run + */ + void execute( Runnable command, + long delay ); + + /** + * Immediate Execution of a runnable in the background + * + * @param command The command to execute + * @param delay The delay before first run + * @param interval The interval of repeated runs + */ + void execute( Runnable command, + long delay, + long interval ); + + /** + * Immediate Execution of a runnable in the background + * + * @param threadPoolName The thread pool to use + * @param command The command to execute + */ + void execute( String threadPoolName, + Runnable command ); + + /** + * Immediate Execution of a runnable in the background + * + * @param threadPoolName The thread pool to use + * @param command The command to execute + * @param delay The delay before first run + */ + void execute( String threadPoolName, + Runnable command, + long delay ); + + /** + * Delayed and repeated Execution of a runnable in the background + * + * @param threadPoolName The thread pool to use + * @param command The command to execute + * @param delay The delay before first run + * @param interval The interval of repeated runs + */ + void execute( String threadPoolName, + Runnable command, + long delay, + long interval ); + + /** + * Remove a [EMAIL PROTECTED] Runnable} from the execution stack + * + * @param command The command to be removed + */ + void remove( Runnable command ); +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/SynchronousChannel.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/SynchronousChannel.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,59 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; +import EDU.oswego.cs.dl.util.concurrent.Rendezvous; + + +/** + * A rendezvous channel, similar to those used in CSP and Ada. Each put must + * wait for a take, and vice versa. Synchronous channels are well suited for + * handoff designs, in which an object running in one thread must synch up + * with an object running in another thread in order to hand it some + * information, event, or task. + * + * <p> + * If you only need threads to synch up without exchanging information, + * consider using a Barrier. If you need bidirectional exchanges, consider + * using a Rendezvous. + * </p> + * + * <p></p> + * + * <p> + * [<a + * href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> + * Introduction to this package. </a>] + * </p> + * + * @see CyclicBarrier + * @see Rendezvous + */ +public class SynchronousChannel + extends EDU.oswego.cs.dl.util.concurrent.SynchronousChannel + implements Queue +{ + //~ Methods ---------------------------------------------------------------- + + /** + * @see org.apache.cocoon.components.thread.Queue#getQueueSize() + */ + public int getQueueSize( ) + { + return 0; + } +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ThreadFactory.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ThreadFactory.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,68 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * The ThreadFactory interface describes the responability of Factories + * creating Thread for [EMAIL PROTECTED] ThreadPool}s of the [EMAIL PROTECTED] RunnableManager} + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version CVS $Id: ThreadFactory.java 56765 2004-11-06 13:54:31Z giacomo $ + */ +public interface ThreadFactory + extends EDU.oswego.cs.dl.util.concurrent.ThreadFactory +{ + //~ Methods ---------------------------------------------------------------- + + /** + * Set the daemon mode of created <code>Thread</code>s should have + * + * @param isDaemon Whether new [EMAIL PROTECTED] Thread}s should run as daemons. + */ + void setDaemon( boolean isDaemon ); + + /** + * Get the daemon mode created <code>Thread</code>s will have + * + * @return Whether new [EMAIL PROTECTED] Thread}s should run as daemons. + */ + boolean isDaemon( ); + + /** + * Set the priority newly created <code>Thread</code>s should have + * + * @param priority One of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, [EMAIL PROTECTED] Thread#MAX_PRIORITY} + */ + void setPriority( int priority ); + + /** + * Get the priority newly created <code>Thread</code>s will have + * + * @return One of [EMAIL PROTECTED] Thread#MIN_PRIORITY}, [EMAIL PROTECTED] + * Thread#NORM_PRIORITY}, [EMAIL PROTECTED] Thread#MAX_PRIORITY} + */ + int getPriority( ); + + /** + * Create a new Thread for a [EMAIL PROTECTED] Runnable} command + * + * @param command The <code>Runnable</code> + * + * @return new <code>Thread</code> + */ + Thread newThread( Runnable command ); +} Added: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ThreadPool.java ============================================================================== --- (empty file) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/thread/ThreadPool.java Mon Nov 8 05:16:55 2004 @@ -0,0 +1,144 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cocoon.components.thread; + +/** + * The ThreadPool interface gives access to methods needed to inspect and use + * of a pool of threads + * + * @author <a href="mailto:giacomo.at.apache.org">Giacomo Pati</a> + * @version CVS $Id: ThreadPool.java 56702 2004-11-05 22:52:05Z giacomo $ + */ +public interface ThreadPool +{ + //~ Instance fields -------------------------------------------------------- + + /** ThreadPool block policy ABORT */ + String POLICY_ABORT = "ABORT"; + + /** ThreadPool block policy DISCARD */ + String POLICY_DISCARD = "DISCARD"; + + /** ThreadPool block policy DISCARD-OLDEST */ + String POLICY_DISCARD_OLDEST = "DISCARDOLDEST"; + + /** ThreadPool block policy RUN */ + String POLICY_RUN = "RUN"; + + /** ThreadPool block policy WAIT */ + String POLICY_WAIT = "WAIT"; + + /** The Role name */ + String ROLE = ThreadPool.class.getName( ); + + //~ Methods ---------------------------------------------------------------- + + /** + * The blocking policy used + * + * @return DOCUMENT ME! + */ + String getBlockPolicy( ); + + /** + * How long will a thread in this pool be idle before it is allowed to be + * garbage collected + * + * @return maximum idle time + */ + long getKeepAliveTime( ); + + /** + * How many threads are in this pool at maximum + * + * @return maximum size of pool + */ + int getMaximumPoolSize( ); + + /** + * Maximum size of the queue + * + * @return current size of queue + */ + int getMaximumQueueSize( ); + + /** + * How many threads are in this pool at minimum + * + * @return minimum size of pool + */ + int getMinimumPoolSize( ); + + /** + * The Name of this thread pool + * + * @return The name + */ + String getName( ); + + /** + * How many threads are currently in this pool + * + * @return current size of pool + */ + int getPoolSize( ); + + /** + * Get the thread priority used by this pool + * + * @return current size of queue + */ + int getPriority( ); + + /** + * Current size of the queue + * + * @return current size of queue + */ + int getQueueSize( ); + + /** + * Whether this ThreadPool has a queue + * + * @return Returns true if this ThreadPool has a queue + */ + boolean isQueued( ); + + /** + * Returns true if a shutDown method has succeeded in terminating all + * threads + * + * @return Whether a shutDown method has succeeded in terminating all + * threads + */ + boolean isTerminatedAfterShutdown( ); + + /** + * Execute a command using this pool + * + * @param command a [EMAIL PROTECTED] Runnable} to execute + * + * @throws InterruptedException In case of interruption + */ + void execute( Runnable command ) + throws InterruptedException; + + /** + * Terminates all threads possibly awaiting processing all elements + * currently in queue. + */ + void shutdown( ); +} Modified: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/transformation/helpers/DefaultIncludeCacheManager.java ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/transformation/helpers/DefaultIncludeCacheManager.java (original) +++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/transformation/helpers/DefaultIncludeCacheManager.java Mon Nov 8 05:16:55 2004 @@ -34,6 +34,7 @@ import org.apache.cocoon.components.sax.XMLSerializer; import org.apache.cocoon.components.sax.XMLTeePipe; import org.apache.cocoon.components.source.SourceUtil; +import org.apache.cocoon.components.thread.RunnableManager; import org.apache.cocoon.xml.XMLConsumer; import org.apache.excalibur.source.Source; import org.apache.excalibur.source.SourceException; @@ -41,6 +42,7 @@ import org.apache.excalibur.source.SourceValidity; import org.apache.excalibur.store.Store; import org.xml.sax.SAXException; +import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * Default implementation of a [EMAIL PROTECTED] IncludeCacheManager}. @@ -55,7 +57,7 @@ * log, so actually cached content is never updated! * * @author <a href="mailto:[EMAIL PROTECTED]">Carsten Ziegeler</a> - * @version CVS $Id: DefaultIncludeCacheManager.java,v 1.8 2004/03/18 07:42:12 cziegeler Exp $ + * @version CVS $Id$ * @since 2.1 */ public final class DefaultIncludeCacheManager @@ -108,10 +110,17 @@ if (this.getLogger().isDebugEnabled()) { this.getLogger().debug("Booting preemptive loader: " + this.preemptiveLoaderURI); } - PreemptiveBooter thread = new PreemptiveBooter(); - thread.setURI(this.preemptiveLoaderURI); - thread.start(); - Thread.yield(); + PreemptiveBooter thread = new PreemptiveBooter( this.preemptiveLoaderURI ); + try + { + final RunnableManager runnableManager = (RunnableManager)this.manager.lookup( RunnableManager.ROLE ); + runnableManager.execute( thread ); + this.manager.release( runnableManager ); + } + catch( final ServiceException se ) + { + getLogger().error( "Cannot lookup RunnableManager", se ); + } } } } @@ -221,13 +230,8 @@ this.getLogger().debug("Waiting for pooled thread to finish loading."); } - // wait - while (!loader.finished) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } + // wait for it + loader.join(); if (this.getLogger().isDebugEnabled()) { this.getLogger().debug("Pooled thread finished loading."); @@ -414,53 +418,59 @@ this.defaultCacheStorage = new StoreIncludeCacheStorageProxy(this.store, this.getLogger()); } -} - -final class LoaderThread implements Runnable { - - private Source source; - private XMLSerializer serializer; - boolean finished; - Exception exception; - byte[] content; - ServiceManager manager; - - public LoaderThread(Source source, - XMLSerializer serializer, - ServiceManager manager) { - this.source = source; - this.serializer = serializer; - this.finished = false; - this.manager = manager; - } - - public void run() { - try { - SourceUtil.toSAX(this.source, this.serializer); - this.content = (byte[])this.serializer.getSAXFragment(); - } catch (Exception local) { - this.exception = local; - } finally { - this.manager.release( this.serializer ); - this.finished = true; + final private static class LoaderThread implements Runnable { + + final private Source source; + final private XMLSerializer serializer; + final private CountDown finished; + Exception exception; + byte[] content; + final private ServiceManager manager; + + public LoaderThread(Source source, + XMLSerializer serializer, + ServiceManager manager) { + this.source = source; + this.serializer = serializer; + this.finished = new CountDown( 1 ); + this.manager = manager; + } + + public void run() { + try { + SourceUtil.toSAX(this.source, this.serializer); + this.content = (byte[])this.serializer.getSAXFragment(); + } catch (Exception local) { + this.exception = local; + } finally { + this.manager.release( this.serializer ); + this.finished.release(); + } + } + + void join() { + try { + this.finished.acquire(); + } catch ( final InterruptedException ie) { + // ignore + } } } - -} -final class PreemptiveBooter extends Thread { + final class PreemptiveBooter implements Runnable { - private String uri; - - void setURI(String uri) { - this.uri = uri; - } - - public void run() { - try { - URL url = new URL(this.uri); - url.getContent(); - } catch (Exception ignore) { + private final String uri; + + public PreemptiveBooter( final String uri ) { + this.uri = uri; + } + + public void run() { + try { + URL url = new URL(this.uri); + url.getContent(); + } catch (Exception ignore) { + } } } -} \ No newline at end of file +} Modified: cocoon/branches/BRANCH_2_1_X/src/webapp/WEB-INF/cocoon.xconf ============================================================================== --- cocoon/branches/BRANCH_2_1_X/src/webapp/WEB-INF/cocoon.xconf (original) +++ cocoon/branches/BRANCH_2_1_X/src/webapp/WEB-INF/cocoon.xconf Mon Nov 8 05:16:55 2004 @@ -547,4 +547,113 @@ value="http://localhost:8080/cocoon/samples/cinclude/loader"/> --> </component> + + <!--+ + | Runnable manager + | + | this component manages commands (Runnables) executed in background using + | preconfigured pools of worker threads + +--> + <runnable-manager logger="core.runnable"> + <!--+ + | This is the default configuration of the runnable-manager. More + | indepth information can be found at + | http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html + | The following elements can be used: + | + | thread-factory: specifies the fully qualified class name of an + | org.apache.cocoon.components.thread.ThreadFactory + | implementation. It is responsible to create Thread + | classes. + | thread-pools: container element for thread-pool elements. + | name: required name of the pool. + | priority: optional priority all threads of the pool will + | have (the ThreadFactory will be set to this + | priority).The possible values are: + | MIN: corresponds to Thread#MIN_PRIORITY + | NORM: corresponds to Thread#NORM_PRIORITY (default) + | MAX: corresponds to Thread#MAX_PRIORITY + | daemon: whether newly created Threads should run in + | daemon mode or not. Default to false. + | queue-size: optional size of a queue to hold Runnables if the + | pool is full. Possible values are: + | less than 0: unbounded (default) + | equal to 0: no queue at all + | greater than 0: size of the queue + | max-pool-size: optional maximum number of threads in the pool. + | Defaults to 5. + | NOTE: if a queue is specified (queue-sie != 0) + | this value will be ignored. + | min-pool-size: optional minimum number of threads in the pool. + | Defaults to 5. + | NOTE: if a queue has been specified (queue-sie != 0) + | this value will be used as the maximum of + | thread running concurrently. + | keep-alive-time-ms: The time in ms an idle thread should keep alive + | before it might get garbage collected. This + | defaults to 60000 ms. + | block-policy; The policy to be used if all resources (thread in + | the pool and slots in the queue) are exhausted. + | Possible values are: + | ABORT: Throw a RuntimeException + | DISCARD: Throw away the current request + | and return. + | DISCARDOLDEST: Throw away the oldest request + | and return. + | RUN (default): The thread making the execute + | request runs the task itself. + | This policy helps guard against + | lockup. + | WAIT: Wait until a thread becomes + | available. This policy should, in + | general, not be used if the + | minimum number of threads is zero, + | in which case a thread may never + | become available. + | shutdown-graceful: Terminate thread pool after processing all + | Runnables currently in queue. Any Runnable entered + | after this point will be discarded. A shut down + | pool cannot be restarted. This also means that a + | pool will need keep-alive-time-ms to terminate. + | The default value not to shutdown graceful. + | shutdown-wait-time-ms: The time in ms to wait before issuing an + | immediate shutdown after a graceful shutdown + | has been requested. + +--> + <thread-factory>org.apache.cocoon.components.thread.DefaultThreadFactory</thread-factory> + <thread-pools> + <!--+ + | This is the default thread pool. It's use fits best for short + | running background tasks. + +--> + <thread-pool> + <name>default</name> + <priority>NORM</priority> + <daemon>false</daemon> + <queue-size>-1</queue-size> + <max-pool-size>5</max-pool-size> + <min-pool-size>5</min-pool-size> + <keep-alive-time-ms>60000</keep-alive-time-ms> + <block-policy>RUN</block-policy> + <shutdown-graceful>false</shutdown-graceful> + <shutdown-wait-time-ms>-1</shutdown-wait-time-ms> + </thread-pool> + <!--+ + | This thread pool should be used for daemons (permanently running + | threads). + +--> + <thread-pool> + <name>daemon</name> + <priority>NORM</priority> + <daemon>true</daemon> + <queue-size>0</queue-size> + <max-pool-size>-1</max-pool-size> + <min-pool-size>1</min-pool-size> + <keep-alive-time-ms>60000</keep-alive-time-ms> + <block-policy>ABORT</block-policy> + <shutdown-graceful>false</shutdown-graceful> + <shutdown-wait-time-ms>-1</shutdown-wait-time-ms> + </thread-pool> + </thread-pools> + </runnable-manager> </cocoon> Modified: cocoon/branches/BRANCH_2_1_X/status.xml ============================================================================== --- cocoon/branches/BRANCH_2_1_X/status.xml (original) +++ cocoon/branches/BRANCH_2_1_X/status.xml Mon Nov 8 05:16:55 2004 @@ -199,6 +199,11 @@ <changes> <release version="@version@" date="@date@"> + <action dev="GP" type="add"> + Added replacement for Excalibur Event package in org.apache.cocoon.components.thread and migrated most + classes using their own threads to that package + </action> + <action dev="AG" type="update"> Updated antlr to 2.7.4, db-ojb to 1.0.1 </action>