proyal 02/05/24 09:32:44
Modified: event/src/test/org/apache/excalibur/event/command/test
TPCThreadManagerTestCase.java
event/src/java/org/apache/excalibur/event/command
TPCThreadManager.java
Log:
TPCThreadManager:
* Made AbstractLogEnabled to support a Logger
* Removed parameters from constructor, now passed via parameterize()
* Made Initializable
* Modified error handling in run() to catch any runtime exceptions to keep
main thread alive
TPCThreadManagerTestCase
* Restyled code
* Modified to work with TPCThreadManager changes
Revision Changes Path
1.2 +65 -29
jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java
Index: TPCThreadManagerTestCase.java
===================================================================
RCS file:
/home/cvs/jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TPCThreadManagerTestCase.java 24 May 2002 14:52:45 -0000 1.1
+++ TPCThreadManagerTestCase.java 24 May 2002 16:32:44 -0000 1.2
@@ -9,7 +9,8 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import junit.framework.TestCase;
+
+import org.apache.avalon.framework.parameters.Parameters;
import org.apache.excalibur.event.DefaultQueue;
import org.apache.excalibur.event.EventHandler;
import org.apache.excalibur.event.Queue;
@@ -20,13 +21,16 @@
import org.apache.excalibur.event.command.EventPipeline;
import org.apache.excalibur.event.command.TPCThreadManager;
+import junit.framework.TestCase;
+
/**
* @author <a href="mailto:[EMAIL PROTECTED]">Gregory Steuck</a>
*/
public class TPCThreadManagerTestCase extends TestCase
{
- public TPCThreadManagerTestCase(String name) {
- super(name);
+ public TPCThreadManagerTestCase( String name )
+ {
+ super( name );
}
// number of milliseconds it reasonably takes the JVM to switch threads
@@ -35,6 +39,17 @@
// number of times the handler should be called
private final static int MINIMAL_NUMBER_INVOCATIONS = 2;
+ private Parameters createParameters( int processors, int
threadsPerProcessor, long sleep )
+ {
+ final Parameters parameters = new Parameters();
+
+ parameters.setParameter( "processors", String.valueOf( processors )
);
+ parameters.setParameter( "threads-per-processor", String.valueOf(
threadsPerProcessor ) );
+ parameters.setParameter( "sleep-time", String.valueOf( sleep ) );
+
+ return parameters;
+ }
+
/**
* Checks TPCThreadManager ability to survive the situation when
* it tries to schedule more tasks than it has threads. Originally
@@ -48,25 +63,37 @@
{
// enforces only 1 thread and no timeout which makes it
// fail quickly
- final TPCThreadManager threadManager = new TPCThreadManager(1, 1, 0);
+ final TPCThreadManager threadManager = new TPCThreadManager();
+
+ threadManager.parameterize( createParameters( 1, 1, 0 ) );
+ threadManager.initialize();
+
// an obviously syncronized component
final StringBuffer result = new StringBuffer();
final StringWriter exceptionBuffer = new StringWriter();
- final PrintWriter errorOut = new PrintWriter(exceptionBuffer);
- threadManager.register(new Pipeline(result, errorOut));
+ final PrintWriter errorOut = new PrintWriter( exceptionBuffer );
+
+ threadManager.register( new Pipeline( result, errorOut ) );
+
// sleeps for 1 more scheduling timeout to surely go over limit
- Thread.sleep(SCHEDULING_TIMEOUT * (MINIMAL_NUMBER_INVOCATIONS + 1));
+ Thread.sleep( SCHEDULING_TIMEOUT * ( MINIMAL_NUMBER_INVOCATIONS + 1
) );
+
int numberCalls = result.length();
+
String msg =
- "Number of calls to handler (" + numberCalls +
- ") is less than the expected number of calls (" +
- MINIMAL_NUMBER_INVOCATIONS + ")";
- assertTrue(msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS);
+ "Number of calls to handler (" + numberCalls +
+ ") is less than the expected number of calls (" +
+ MINIMAL_NUMBER_INVOCATIONS + ")";
+
+ assertTrue( msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS );
+
errorOut.flush(); // why not?
+
String stackTrace = exceptionBuffer.toString();
- assertEquals("Exceptions while running the test",
- "",
- stackTrace);
+
+ assertEquals( "Exceptions while running the test",
+ "",
+ stackTrace );
}
private static class Pipeline implements EventPipeline, EventHandler
@@ -76,17 +103,20 @@
private final StringBuffer m_result;
private final PrintWriter m_errorOut;
- Pipeline(StringBuffer resultAccumulator, PrintWriter errorOut)
- throws SinkException
+ Pipeline( StringBuffer resultAccumulator, PrintWriter errorOut )
+ throws SinkException
{
m_result = resultAccumulator;
m_errorOut = errorOut;
// even though TPCThreadManager currently calls event handlers
// when there is nothing to do, that may change
- m_queue.enqueue(new QueueElement() {});
+ m_queue.enqueue( new QueueElement()
+ {
+ } );
}
- public EventHandler getEventHandler() {
+ public EventHandler getEventHandler()
+ {
return this;
}
@@ -100,23 +130,29 @@
return m_queue;
}
-
- public void handleEvent(QueueElement element) {
- handleEvents(new QueueElement[] {element});
+ public void handleEvent( QueueElement element )
+ {
+ handleEvents( new QueueElement[]{element} );
}
- public void handleEvents(QueueElement[] elements) {
+ public void handleEvents( QueueElement[] elements )
+ {
// records the fact that the handler was called
- m_result.append('a');
- try {
+ m_result.append( 'a' );
+ try
+ {
// sleeps to occupy the thread and let thread manager try to
reschedule
- Thread.sleep(SCHEDULING_TIMEOUT);
+ Thread.sleep( SCHEDULING_TIMEOUT );
// enqueues another element to be called again
- m_queue.enqueue(new QueueElement() {});
- } catch (Exception e) {
+ m_queue.enqueue( new QueueElement()
+ {
+ } );
+ }
+ catch( Exception e )
+ {
// fails the test, no exceptions are expected
- e.printStackTrace(m_errorOut);
-
+ e.printStackTrace( m_errorOut );
+
}
}
}
1.16 +115 -48
jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
Index: TPCThreadManager.java
===================================================================
RCS file:
/home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- TPCThreadManager.java 24 May 2002 14:52:45 -0000 1.15
+++ TPCThreadManager.java 24 May 2002 16:32:44 -0000 1.16
@@ -15,7 +15,12 @@
import org.apache.avalon.excalibur.thread.ThreadPool;
import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
import org.apache.avalon.framework.activity.Disposable;
+import org.apache.avalon.framework.activity.Initializable;
+import org.apache.avalon.framework.logger.AbstractLogEnabled;
import org.apache.avalon.framework.logger.NullLogger;
+import org.apache.avalon.framework.parameters.ParameterException;
+import org.apache.avalon.framework.parameters.Parameterizable;
+import org.apache.avalon.framework.parameters.Parameters;
import org.apache.excalibur.event.EventHandler;
import org.apache.excalibur.event.Source;
import org.apache.excalibur.util.SystemUtil;
@@ -23,63 +28,93 @@
/**
* This is a ThreadManager that uses a certain number of threads per
processor.
* The number of threads in the pool is a direct proportion to the number of
- * processors.
+ * processors. The size of the thread pool is (processors *
threads-per-processor) + 1
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Royal</a>
*/
-public final class TPCThreadManager
- implements Runnable, ThreadManager, Disposable
+public final class TPCThreadManager extends AbstractLogEnabled
+ implements Runnable, ThreadManager, Parameterizable, Initializable,
Disposable
{
- private final ThreadPool m_threadPool;
private final Mutex m_mutex = new Mutex();
private final HashMap m_pipelines = new HashMap();
+
+ private ThreadPool m_threadPool;
private ThreadControl m_threadControl;
private boolean m_done = false;
- private final long m_sleepTime;
-
- /**
- * The default constructor assumes there is a system property named
"os.arch.cpus"
- * that has a default for the number of CPUs on a system. Otherwise,
the value
- * is 1.
- */
- public TPCThreadManager()
- {
- this( SystemUtil.numProcessors() );
- }
-
- /**
- * Constructor provides one thread per number of processors.
- */
- public TPCThreadManager( int numProcessors )
- {
- this( numProcessors, 1 );
- }
-
- /**
- * Constructor provides a specified number of threads per processor. If
- * either value is less then one, then the value is rewritten as one.
- */
- public TPCThreadManager( int numProcessors, int threadsPerProcessor )
- {
- this( numProcessors, threadsPerProcessor, 1000 );
- }
-
- /**
- * Constructor provides a specified number of threads per processor. If
- * either value is less then one, then the value is rewritten as one.
- */
- public TPCThreadManager( int numProcessors, int threadsPerProcessor,
long sleepTime )
- {
- int processors = Math.max( numProcessors, 1 );
- int threads = Math.max( threadsPerProcessor, 1 );
- ResourceLimitingThreadPool tpool = new ResourceLimitingThreadPool(
"TPCThreadManager",
- (
processors * threads ) + 1, true, true, 1000L, 10L * 1000L );
- tpool.enableLogging( new NullLogger() );
- m_threadPool = tpool;
-
- m_sleepTime = sleepTime;
- m_threadControl = m_threadPool.execute( this );
+ //Set reasonable defaults in case parameterize() is never called.
+ private long m_sleepTime = 1000L;
+ private long m_blockTimeout = 1000L;
+ private int m_processors = 1;
+ private int m_threadsPerProcessor = 1;
+
+ private boolean m_initialized = false;
+
+ /**
+ * The following parameters can be set for this class:
+ *
+ * <table>
+ * <tr>
+ * <th>Name</th> <th>Description</td> <th>Default Value</th>
+ * </tr>
+ * <tr>
+ * <td>processors</td>
+ * <td>Number of processors (Rewritten to 1 if less than one)</td>
+ * <td>System property named "os.arch.cpus", otherwise 1</td>
+ * </tr>
+ * <tr>
+ * <td>threads-per-processor</td>
+ * <td>Threads per processor to use (Rewritten to 1 if less than
one)</td>
+ * <td>1</td>
+ * </tr>
+ * <tr>
+ * <td>sleep-time</td>
+ * <td>Time (in milliseconds) to wait between queue pipeline
processing runs</td>
+ * <td>1000</td>
+ * </tr>
+ * <tr>
+ * <td>block-timeout</td>
+ * <td>Time (in milliseconds) to wait for a thread to process a
pipeline</td>
+ * <td>1000</td>
+ * </tr>
+ * </table>
+ */
+ public void parameterize( Parameters parameters ) throws
ParameterException
+ {
+ this.m_processors =
+ Math.max( parameters.getParameterAsInteger( "processors",
SystemUtil.numProcessors() ), 1 );
+
+ this.m_threadsPerProcessor = Math.max(
parameters.getParameterAsInteger( "threads-per-processor", 1 ), 1 );
+ this.m_sleepTime = parameters.getParameterAsLong( "sleep-time",
1000L );
+ this.m_blockTimeout = parameters.getParameterAsLong(
"block-timeout", 1000L );
+ }
+
+ public void initialize() throws Exception
+ {
+ if( this.m_initialized )
+ {
+ throw new IllegalStateException( "ThreadManager is already
initailized" );
+ }
+
+ final ResourceLimitingThreadPool tpool =
+ new ResourceLimitingThreadPool( "TPCThreadManager",
+ ( m_processors *
m_threadsPerProcessor ) + 1,
+ true,
+ true,
+ this.m_blockTimeout,
+ 10L * 1000L );
+
+ if( null == getLogger() )
+ {
+ this.enableLogging( new NullLogger() );
+ }
+
+ tpool.enableLogging( getLogger() );
+
+ this.m_threadPool = tpool;
+ this.m_threadControl = this.m_threadPool.execute( this );
+ this.m_initialized = true;
}
/**
@@ -87,6 +122,11 @@
*/
public void register( EventPipeline pipeline )
{
+ if( !this.m_initialized )
+ {
+ throw new IllegalStateException( "ThreadManager must be
initialized before registering a pipeline" );
+ }
+
try
{
m_mutex.acquire();
@@ -116,6 +156,11 @@
*/
public void deregister( EventPipeline pipeline )
{
+ if( !this.m_initialized )
+ {
+ throw new IllegalStateException( "ThreadManager must be
initialized before deregistering a pipeline" );
+ }
+
try
{
m_mutex.acquire();
@@ -146,6 +191,11 @@
*/
public void deregisterAll()
{
+ if( !this.m_initialized )
+ {
+ throw new IllegalStateException( "ThreadManager must be
initialized before deregistering pipelines" );
+ }
+
try
{
m_mutex.acquire();
@@ -203,6 +253,23 @@
// that it has no threads available, will still
try
// to go on, hopefully at one point there will be
// a thread to execute our runner
+
+ if( getLogger().isWarnEnabled() )
+ {
+ getLogger().warn( "Unable to execute
pipeline (If out of threads, "
+ + "increase block-timeout
or number of threads per processor", e );
+ }
+ }
+ catch( RuntimeException e )
+ {
+ //We want to catch this, because if an
unexpected runtime exception comes through a single
+ //pipeline it can bring down the primary thread
+
+ if( getLogger().isErrorEnabled() )
+ {
+ getLogger().error( "Exception processing
EventPipeline [msg: " + e.getMessage() + "]",
+ e );
+ }
}
}
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>