bloritsch 2003/06/12 07:35:06
Modified: event/src/java/org/apache/excalibur/event/command
TPSPThreadManager.java
Log:
Allow for a margin of error. I.e. The threshold accounts for enqueued items +/- a
margin. If the queue size
is greater than the threshold + margin, then we add a new thread. If the queue size
is less than the
threshold - margin, then we remove a thread (bound by the minimum thread).
Revision Changes Path
1.27 +32 -5
avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPSPThreadManager.java
Index: TPSPThreadManager.java
===================================================================
RCS file:
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPSPThreadManager.java,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -r1.26 -r1.27
--- TPSPThreadManager.java 12 Jun 2003 14:06:52 -0000 1.26
+++ TPSPThreadManager.java 12 Jun 2003 14:35:05 -0000 1.27
@@ -199,7 +199,7 @@
for( int i = 0; i < sources.length; i++ )
{
- handler.handleEvents( sources[ i ].dequeueAll() );
+ handler.handleEvent( sources[ i ].dequeue() );
}
}
}
@@ -210,17 +210,38 @@
private final PooledExecutor m_threadPool;
private final int m_threshold;
private final DequeueInterceptor m_parent;
+ private final int m_margin;
- public SourceDequeueInterceptor( Source source, PooledExecutor threadPool,
int threshold )
+ public SourceDequeueInterceptor( Source source, PooledExecutor threadPool,
int threshold, int margin )
{
if (source == null) throw new NullPointerException("source");
if (threadPool == null) throw new NullPointerException("threadPool");
+ if ( threshold < threadPool.getMinimumPoolSize())
+ throw new IllegalArgumentException("threshold must be higher than
the minimum number" +
+ " of threads for the pool");
+ if ( margin < 0 )
+ throw new IllegalArgumentException("margin must not be less then
zero");
+ if ( threshold - margin <= threadPool.getMinimumPoolSize() )
+ throw new IllegalArgumentException( "The margin must not exceed or
equal the" +
+ " differnece between threshold
and the thread" +
+ " pool minimum size" );
m_source = source;
m_threadPool = threadPool;
m_threshold = threshold;
- m_parent = (source instanceof Queue) ?
((Queue)source).getDequeueInterceptor()
- : new NullDequeueInterceptor();
+
+ if (source instanceof Queue)
+ {
+ Queue queue = (Queue)source;
+ m_parent = queue.getDequeueInterceptor();
+ queue.setDequeueInterceptor(this);
+ }
+ else
+ {
+ m_parent = new NullDequeueInterceptor();
+ }
+
+ m_margin = margin;
}
/**
@@ -239,7 +260,7 @@
*/
public void before( Source context )
{
- if (m_source.size() > m_threshold) m_threadPool.createThreads(1);
+ if (m_source.size() > (m_threshold + m_margin))
m_threadPool.createThreads(1);
m_parent.before(context);
}
@@ -260,6 +281,12 @@
public void after( Source context )
{
m_parent.after(context);
+
+ if (m_source.size() < (m_threshold - m_margin))
+ {
+ m_threadPool.setMaximumPoolSize(
+ Math.max(m_threadPool.getMinimumPoolSize(),
m_threadPool.getPoolSize() - 1));
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]