bloritsch 2003/06/12 12:55:10
Modified: event/src/java/org/apache/excalibur/event/command
TPSPThreadManager.java
Log:
TPSPThreadManager is written, but it needs to be validated and refactored.
Perhaps the two classes (SourceRunner and SourceDequeueInterceptor) can
be merged....
Revision Changes Path
1.29 +126 -76
avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPSPThreadManager.java
Index: TPSPThreadManager.java
===================================================================
RCS file:
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPSPThreadManager.java,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -r1.28 -r1.29
--- TPSPThreadManager.java 12 Jun 2003 14:36:16 -0000 1.28
+++ TPSPThreadManager.java 12 Jun 2003 19:55:09 -0000 1.29
@@ -49,7 +49,7 @@
*/
package org.apache.excalibur.event.command;
-import java.util.Iterator;
+import java.util.*;
import org.apache.commons.collections.StaticBucketMap;
import org.apache.excalibur.event.EventHandler;
@@ -66,22 +66,19 @@
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
*/
-public final class TPSPThreadManager implements Runnable, ThreadManager
+public final class TPSPThreadManager implements ThreadManager
{
private final StaticBucketMap m_pipelines = new StaticBucketMap();
- private volatile boolean m_done = false;
- private final long m_sleepTime;
- private int m_threadsPerPool = 2;
+ private final int m_maxThreadsPerPool;
+ private final int m_threshold;
+ private final int m_margin;
/**
* The default constructor assumes there is a system property named
* "os.arch.cpus" that has a default for the number of CPUs on a system.
* Otherwise, the value is 1.
- *
- * @throws Exception if there is any problems creating the ThreadManager
*/
public TPSPThreadManager()
- throws Exception
{
this( 2, 1000 );
}
@@ -91,20 +88,26 @@
* either value is less then one, then the value is rewritten as one.
*
* @param maxThreadPerPool The number of processors in the machine
- * @param sleepTime The number of milliseconds to wait between cycles
- *
- * @throws Exception when there is a problem creating the ThreadManager
+ * @param threshold The number of events before a new thread is started
*/
- public TPSPThreadManager( int maxThreadPerPool, long sleepTime )
- throws Exception
+ public TPSPThreadManager( int maxThreadPerPool, int threshold )
{
- m_threadsPerPool = maxThreadPerPool;
-
- m_sleepTime = sleepTime;
+ this(maxThreadPerPool, threshold, (threshold/4));
+ }
- Thread runner = new Thread(this);
- runner.setDaemon(true);
- runner.start();
+ /**
+ * Constructor provides a specified number of threads per processor. If
+ * either value is less then one, then the value is rewritten as one.
+ *
+ * @param maxThreadPerPool The number of processors in the machine
+ * @param threshold The number of events before a new thread is started
+ * @param margin The number of events +/- the threshold for thread
evaluation
+ */
+ public TPSPThreadManager( int maxThreadPerPool, int threshold, int margin )
+ {
+ m_maxThreadsPerPool = maxThreadPerPool;
+ m_threshold = threshold;
+ m_margin = margin;
}
/**
@@ -114,14 +117,28 @@
*/
public void register( EventPipeline pipeline )
{
- m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
+ Source[] sources = pipeline.getSources();
+ EventHandler handler = pipeline.getEventHandler();
+ List sourceList = new ArrayList(sources.length);
- if( m_done )
+ for (int i = 0; i < sources.length; i++)
{
- Thread runner = new Thread(this);
- runner.setDaemon(true);
- runner.start();
+ PooledExecutor threadPool = new PooledExecutor();
+ threadPool.setMinimumPoolSize(1);
+ threadPool.setMaximumPoolSize(m_maxThreadsPerPool);
+ SourceRunner initRunner = new SourceRunner(sources[i], handler);
+
+ try
+ {
+ threadPool.execute(initRunner);
+ }
+ catch ( InterruptedException e )
+ {
+ }
+
+ sourceList.add(new SourceDequeueInterceptor(initRunner, handler,
threadPool, m_threshold, m_margin));
}
+ m_pipelines.put( pipeline, sourceList );
}
/**
@@ -131,11 +148,12 @@
*/
public void deregister( EventPipeline pipeline )
{
- m_pipelines.remove( pipeline );
-
- if( m_pipelines.isEmpty() )
+ List sources = (List) m_pipelines.remove( pipeline );
+ Iterator it = sources.iterator();
+ while(it.hasNext())
{
- m_done = true;
+ SourceDequeueInterceptor intercept =
(SourceDequeueInterceptor)it.next();
+ intercept.stop();
}
}
@@ -144,77 +162,84 @@
*/
public void deregisterAll()
{
- m_done = true;
- m_pipelines.clear();
+ Iterator it = m_pipelines.keySet().iterator();
+ while(it.hasNext())
+ {
+ deregister((EventPipeline)it.next());
+ }
}
- public void run()
+ protected static final class SourceRunner implements Runnable
{
- while( !m_done )
+ private final Source m_source;
+ private final EventHandler m_handler;
+ private volatile boolean m_keepProcessing;
+
+ protected SourceRunner( final Source source, final EventHandler handler )
{
- Iterator i = m_pipelines.values().iterator();
+ if ( source == null ) throw new NullPointerException("source");
+ if(handler == null)throw new NullPointerException("handler");
+ m_source = source;
+ m_handler = handler;
+ m_keepProcessing = true;
+ }
- while( i.hasNext() )
+ public void run()
+ {
+ while (m_keepProcessing)
{
- Thread runner = new Thread( (PipelineRunner)i.next() );
- runner.setDaemon(true);
- runner.start();
- }
+ Object event = m_source.dequeue();
- if (! m_done)
- {
- try
- {
- Thread.sleep( m_sleepTime );
- }
- catch( InterruptedException ie )
+ if ( event != null )
{
- // ignore and continue processing
+ m_handler.handleEvent( event );
}
+
+ yield();
}
}
- }
-
- /**
- * The PipelineRunner will run the pipelines
- */
- public static final class PipelineRunner implements Runnable
- {
- private final EventPipeline m_pipeline;
/**
- * Create a PipelineRunner
- *
- * @param pipeline The pipeline we are wrapping
+ * A way to make sure we yield the processor up to the next thread.
*/
- protected PipelineRunner( EventPipeline pipeline )
+ private static void yield()
{
- m_pipeline = pipeline;
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ie)
+ {
+ //Nothing to do.
+ }
}
- public void run()
+ public void stop()
{
- Source[] sources = m_pipeline.getSources();
- EventHandler handler = m_pipeline.getEventHandler();
+ m_keepProcessing = false;
+ }
- for( int i = 0; i < sources.length; i++ )
- {
- handler.handleEvent( sources[ i ].dequeue() );
- }
+ public Source getSource()
+ {
+ return m_source;
}
}
- public static final class SourceDequeueInterceptor implements DequeueInterceptor
+ protected static final class SourceDequeueInterceptor implements
DequeueInterceptor
{
private final Source m_source;
private final PooledExecutor m_threadPool;
private final int m_threshold;
private final DequeueInterceptor m_parent;
private final int m_margin;
+ private final LinkedList m_runners;
+ private final EventHandler m_handler;
+ private final SourceRunner m_initRunner;
- public SourceDequeueInterceptor( Source source, PooledExecutor threadPool,
int threshold, int margin )
+ public SourceDequeueInterceptor( SourceRunner runner, EventHandler handler,
PooledExecutor threadPool, int threshold, int margin )
{
- if (source == null) throw new NullPointerException("source");
+ if (runner == null) throw new NullPointerException("runner");
+ if (handler == null) throw new NullPointerException("handler");
if (threadPool == null) throw new NullPointerException("threadPool");
if ( threshold < threadPool.getMinimumPoolSize())
throw new IllegalArgumentException("threshold must be higher than
the minimum number" +
@@ -226,13 +251,16 @@
" differnece between threshold
and the thread" +
" pool minimum size" );
- m_source = source;
+ m_source = runner.getSource();
+ m_initRunner = runner;
m_threadPool = threadPool;
m_threshold = threshold;
+ m_runners = new LinkedList();
+ m_handler = handler;
- if (source instanceof Queue)
+ if ( m_source instanceof Queue)
{
- Queue queue = (Queue)source;
+ Queue queue = (Queue) m_source;
m_parent = queue.getDequeueInterceptor();
queue.setDequeueInterceptor(this);
}
@@ -262,8 +290,16 @@
{
if (m_source.size() > (m_threshold + m_margin))
{
- m_threadPool.setMaximumPoolSize(m_threadPool.getPoolSize() + 1);
- m_threadPool.createThreads(1);
+ SourceRunner runner = new SourceRunner(m_source, m_handler);
+ try
+ {
+ m_threadPool.execute(runner);
+ }
+ catch ( InterruptedException e )
+ {
+ }
+
+ m_runners.add( runner );
}
m_parent.before(context);
}
@@ -288,9 +324,23 @@
if (m_source.size() < (m_threshold - m_margin))
{
- m_threadPool.setMaximumPoolSize(
- Math.max(m_threadPool.getMinimumPoolSize(),
m_threadPool.getPoolSize() - 1));
+ if ( m_runners.size() > 0 )
+ {
+ SourceRunner runner = (SourceRunner)m_runners.removeFirst();
+ runner.stop();
+ }
}
+ }
+
+ public void stop()
+ {
+ Iterator it = m_runners.iterator();
+ while(it.hasNext())
+ {
+ ((SourceRunner)it.next()).stop();;
+ }
+
+ m_initRunner.stop();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]