bloritsch    2003/06/12 12:55:10

  Modified:    event/src/java/org/apache/excalibur/event/command
                        TPSPThreadManager.java
  Log:
  TPSPThreadManager is written, but it needs to be validated and refactored.
  Perhaps the two classes (SourceRunner and SourceDequeueInterceptor) can
  be merged....
  
  Revision  Changes    Path
  1.29      +126 -76   
avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPSPThreadManager.java
  
  Index: TPSPThreadManager.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPSPThreadManager.java,v
  retrieving revision 1.28
  retrieving revision 1.29
  diff -u -r1.28 -r1.29
  --- TPSPThreadManager.java    12 Jun 2003 14:36:16 -0000      1.28
  +++ TPSPThreadManager.java    12 Jun 2003 19:55:09 -0000      1.29
  @@ -49,7 +49,7 @@
   */
   package org.apache.excalibur.event.command;
   
  -import java.util.Iterator;
  +import java.util.*;
   
   import org.apache.commons.collections.StaticBucketMap;
   import org.apache.excalibur.event.EventHandler;
  @@ -66,22 +66,19 @@
    *
    * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
    */
  -public final class TPSPThreadManager implements Runnable, ThreadManager
  +public final class TPSPThreadManager implements ThreadManager
   {
       private final StaticBucketMap m_pipelines = new StaticBucketMap();
  -    private volatile boolean m_done = false;
  -    private final long m_sleepTime;
  -    private int m_threadsPerPool = 2;
  +    private final int m_maxThreadsPerPool;
  +    private final int m_threshold;
  +    private final int m_margin;
   
       /**
        * The default constructor assumes there is a system property named
        * "os.arch.cpus" that has a default for the number of CPUs on a system.
        * Otherwise, the value is 1.
  -     *
  -     * @throws Exception if there is any problems creating the ThreadManager
        */
       public TPSPThreadManager()
  -        throws Exception
       {
           this( 2, 1000 );
       }
  @@ -91,20 +88,26 @@
        * either value is less then one, then the value is rewritten as one.
        *
        * @param maxThreadPerPool  The number of processors in the machine
  -     * @param sleepTime         The number of milliseconds to wait between cycles
  -     *
  -     * @throws Exception when there is a problem creating the ThreadManager
  +     * @param threshold         The number of events before a new thread is started
        */
  -    public TPSPThreadManager( int maxThreadPerPool, long sleepTime )
  -        throws Exception
  +    public TPSPThreadManager( int maxThreadPerPool, int threshold )
       {
  -        m_threadsPerPool = maxThreadPerPool;
  -
  -        m_sleepTime = sleepTime;
  +        this(maxThreadPerPool, threshold, (threshold/4));
  +    }
   
  -        Thread runner = new Thread(this);
  -        runner.setDaemon(true);
  -        runner.start();
  +    /**
  +     * Constructor provides a specified number of threads per processor. If
  +     * either value is less then one, then the value is rewritten as one.
  +     *
  +     * @param maxThreadPerPool  The number of processors in the machine
  +     * @param threshold         The number of events before a new thread is started
  +     * @param margin            The number of events +/- the threshold for thread 
evaluation
  +     */
  +    public TPSPThreadManager( int maxThreadPerPool, int threshold, int margin )
  +    {
  +        m_maxThreadsPerPool = maxThreadPerPool;
  +        m_threshold = threshold;
  +        m_margin = margin;
       }
   
       /**
  @@ -114,14 +117,28 @@
        */
       public void register( EventPipeline pipeline )
       {
  -        m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
  +        Source[] sources = pipeline.getSources();
  +        EventHandler handler = pipeline.getEventHandler();
  +        List sourceList = new ArrayList(sources.length);
   
  -        if( m_done )
  +        for (int i = 0; i < sources.length; i++)
           {
  -            Thread runner = new Thread(this);
  -            runner.setDaemon(true);
  -            runner.start();
  +            PooledExecutor threadPool = new PooledExecutor();
  +            threadPool.setMinimumPoolSize(1);
  +            threadPool.setMaximumPoolSize(m_maxThreadsPerPool);
  +            SourceRunner initRunner = new SourceRunner(sources[i], handler);
  +
  +            try
  +            {
  +                threadPool.execute(initRunner);
  +            }
  +            catch ( InterruptedException e )
  +            {
  +            }
  +
  +            sourceList.add(new SourceDequeueInterceptor(initRunner, handler, 
threadPool, m_threshold, m_margin));
           }
  +        m_pipelines.put( pipeline, sourceList );
       }
   
       /**
  @@ -131,11 +148,12 @@
        */
       public void deregister( EventPipeline pipeline )
       {
  -        m_pipelines.remove( pipeline );
  -
  -        if( m_pipelines.isEmpty() )
  +        List sources = (List) m_pipelines.remove( pipeline );
  +        Iterator it = sources.iterator();
  +        while(it.hasNext())
           {
  -            m_done = true;
  +            SourceDequeueInterceptor intercept = 
(SourceDequeueInterceptor)it.next();
  +            intercept.stop();
           }
       }
   
  @@ -144,77 +162,84 @@
        */
       public void deregisterAll()
       {
  -        m_done = true;
  -        m_pipelines.clear();
  +        Iterator it = m_pipelines.keySet().iterator();
  +        while(it.hasNext())
  +        {
  +            deregister((EventPipeline)it.next());
  +        }
       }
   
  -    public void run()
  +    protected static final class SourceRunner implements Runnable
       {
  -        while( !m_done )
  +        private final Source m_source;
  +        private final EventHandler m_handler;
  +        private volatile boolean m_keepProcessing;
  +
  +        protected SourceRunner( final Source source, final EventHandler handler )
           {
  -            Iterator i = m_pipelines.values().iterator();
  +            if ( source == null ) throw new NullPointerException("source");
  +            if(handler == null)throw new NullPointerException("handler");
  +            m_source = source;
  +            m_handler = handler;
  +            m_keepProcessing = true;
  +        }
   
  -            while( i.hasNext() )
  +        public void run()
  +        {
  +            while (m_keepProcessing)
               {
  -                Thread runner = new Thread( (PipelineRunner)i.next() );
  -                runner.setDaemon(true);
  -                runner.start();
  -            }
  +                Object event = m_source.dequeue();
   
  -            if (! m_done)
  -            {
  -                try
  -                {
  -                    Thread.sleep( m_sleepTime );
  -                }
  -                catch( InterruptedException ie )
  +                if ( event != null )
                   {
  -                    // ignore and continue processing
  +                    m_handler.handleEvent( event );
                   }
  +
  +                yield();
               }
           }
  -    }
  -
  -    /**
  -     * The PipelineRunner will run the pipelines
  -     */
  -    public static final class PipelineRunner implements Runnable
  -    {
  -        private final EventPipeline m_pipeline;
   
           /**
  -         * Create a PipelineRunner
  -         *
  -         * @param pipeline  The pipeline we are wrapping
  +         * A way to make sure we yield the processor up to the next thread.
            */
  -        protected PipelineRunner( EventPipeline pipeline )
  +        private static void yield()
           {
  -            m_pipeline = pipeline;
  +            try
  +            {
  +                Thread.sleep(1);
  +            }
  +            catch (InterruptedException ie)
  +            {
  +                //Nothing to do.
  +            }
           }
   
  -        public void run()
  +        public void stop()
           {
  -            Source[] sources = m_pipeline.getSources();
  -            EventHandler handler = m_pipeline.getEventHandler();
  +            m_keepProcessing = false;
  +        }
   
  -            for( int i = 0; i < sources.length; i++ )
  -            {
  -                handler.handleEvent( sources[ i ].dequeue() );
  -            }
  +        public Source getSource()
  +        {
  +            return m_source;
           }
       }
   
  -    public static final class SourceDequeueInterceptor implements DequeueInterceptor
  +    protected static final class SourceDequeueInterceptor implements 
DequeueInterceptor
       {
           private final Source m_source;
           private final PooledExecutor m_threadPool;
           private final int m_threshold;
           private final DequeueInterceptor m_parent;
           private final int m_margin;
  +        private final LinkedList m_runners;
  +        private final EventHandler m_handler;
  +        private final SourceRunner m_initRunner;
   
  -        public SourceDequeueInterceptor( Source source, PooledExecutor threadPool, 
int threshold, int margin )
  +        public SourceDequeueInterceptor( SourceRunner runner, EventHandler handler, 
PooledExecutor threadPool, int threshold, int margin )
           {
  -            if (source == null) throw new NullPointerException("source");
  +            if (runner == null) throw new NullPointerException("runner");
  +            if (handler == null) throw new NullPointerException("handler");
               if (threadPool == null) throw new NullPointerException("threadPool");
               if ( threshold < threadPool.getMinimumPoolSize())
                   throw new IllegalArgumentException("threshold must be higher than 
the minimum number" +
  @@ -226,13 +251,16 @@
                                                       " differnece between threshold 
and the thread" +
                                                       " pool minimum size" );
   
  -            m_source = source;
  +            m_source = runner.getSource();
  +            m_initRunner = runner;
               m_threadPool = threadPool;
               m_threshold = threshold;
  +            m_runners = new LinkedList();
  +            m_handler = handler;
   
  -            if (source instanceof Queue)
  +            if ( m_source instanceof Queue)
               {
  -                Queue queue = (Queue)source;
  +                Queue queue = (Queue) m_source;
                   m_parent = queue.getDequeueInterceptor();
                   queue.setDequeueInterceptor(this);
               }
  @@ -262,8 +290,16 @@
           {
               if (m_source.size() > (m_threshold + m_margin))
               {
  -                m_threadPool.setMaximumPoolSize(m_threadPool.getPoolSize() + 1);
  -                m_threadPool.createThreads(1);
  +                SourceRunner runner = new SourceRunner(m_source, m_handler);
  +                try
  +                {
  +                    m_threadPool.execute(runner);
  +                }
  +                catch ( InterruptedException e )
  +                {
  +                }
  +
  +                m_runners.add( runner );
               }
               m_parent.before(context);
           }
  @@ -288,9 +324,23 @@
   
               if (m_source.size() < (m_threshold - m_margin))
               {
  -                m_threadPool.setMaximumPoolSize(
  -                        Math.max(m_threadPool.getMinimumPoolSize(), 
m_threadPool.getPoolSize() - 1));
  +                if ( m_runners.size() > 0 )
  +                {
  +                    SourceRunner runner = (SourceRunner)m_runners.removeFirst();
  +                    runner.stop();
  +                }
               }
  +        }
  +
  +        public void stop()
  +        {
  +            Iterator it = m_runners.iterator();
  +            while(it.hasNext())
  +            {
  +                ((SourceRunner)it.next()).stop();;
  +            }
  +
  +            m_initRunner.stop();
           }
       }
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to