bloritsch    02/01/28 10:42:26

  Modified:    src/scratchpad/org/apache/avalon/excalibur/command
                        CommandManager.java
  Added:       src/scratchpad/org/apache/avalon/excalibur/command
                        EventPipeline.java TPCThreadManager.java
                        TPSPThreadManager.java ThreadManager.java
  Log:
  first stab at CommandManager/ThreadManager
  
  Revision  Changes    Path
  1.2       +171 -22   
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/CommandManager.java
  
  Index: CommandManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/CommandManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- CommandManager.java       28 Jan 2002 16:01:00 -0000      1.1
  +++ CommandManager.java       28 Jan 2002 18:42:25 -0000      1.2
  @@ -7,34 +7,43 @@
    */
   package org.apache.avalon.excalibur.command;
   
  +import org.apache.avalon.excalibur.collections.Buffer;
  +import org.apache.avalon.excalibur.collections.VariableSizeBuffer;
  +import org.apache.avalon.excalibur.concurrent.Mutex;
   import org.apache.avalon.excalibur.event.DefaultQueue;
  +import org.apache.avalon.excalibur.event.EventHandler;
   import org.apache.avalon.excalibur.event.Queue;
   import org.apache.avalon.excalibur.event.QueueElement;
   import org.apache.avalon.excalibur.event.Signal;
  -import org.apache.avalon.excalibur.event.EventHandler;
  +import org.apache.avalon.excalibur.event.Sink;
  +
  +import java.util.ArrayList;
  +import java.util.Collections;
  +import java.util.HashMap;
  +import java.util.Iterator;
  +import java.util.Map;
   
   /**
    * The CommandManager handles asynchronous commands from the rest of the 
system.
    * The only exposed piece is the Queue that other components use to give 
Commands
  - * to this system.
  + * to this system.  You <strong>must</strong> register this with a 
ThreadManager
  + * for it to work.
    *
    * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
    */
  -public class CommandManager implements EventHandler
  +public class CommandManager implements EventPipeline
   {
  -    private final Queue m_queue = new DefaultQueue();
  -    //private final ThreadManager;
  +    private final Queue        m_queue          = new DefaultQueue();
  +    private final HashMap      m_signalHandlers = new HashMap();
  +    private final Mutex        m_mutex          = new Mutex();
  +    private final EventHandler m_eventHandler   = new CommandEventHandler(
  +            Collections.unmodifiableMap( m_signalHandlers ) );
  +    private final Sink[]       m_sinks          = new Sink[] { m_queue };
   
       public CommandManager()
       {
       }
   
  -    /*
  -    public CommandManager( ThreadManager threadManager )
  -    {
  -    }
  -    */
  -
       public final Queue getCommandQueue()
       {
           return m_queue;
  @@ -42,35 +51,175 @@
   
       public final void registerSignalHandler( Signal signal, EventHandler 
handler )
       {
  +        try
  +        {
  +            m_mutex.acquire();
  +            ArrayList handlers = (ArrayList) m_signalHandlers.get( 
signal.getClass() );
  +
  +            if ( null == handlers )
  +            {
  +                handlers = new ArrayList();
  +            }
  +
  +            if ( ! handlers.contains( handler ) )
  +            {
  +                handlers.add( handler );
  +
  +                m_signalHandlers.put( signal.getClass(), handlers );
  +            }
  +        }
  +        catch (InterruptedException ie)
  +        {
  +            // ignore for now
  +        }
  +        finally
  +        {
  +            m_mutex.release();
  +        }
       }
   
       public final void deregisterSignalHandler( Signal signal, EventHandler 
handler )
       {
  -    }
  +        try
  +        {
  +            m_mutex.acquire();
  +            ArrayList handlers = (ArrayList) m_signalHandlers.get( 
signal.getClass() );
   
  -    public final void handleEvents( QueueElement[] elements )
  -    {
  -        for (int i = 0; i < elements.length; i++)
  +            if ( null != handlers )
  +            {
  +                if ( handlers.remove( handler ) )
  +                {
  +                    m_signalHandlers.put( signal.getClass(), handlers );
  +                }
  +
  +                if ( 0 == handlers.size() )
  +                {
  +                    m_signalHandlers.remove( signal.getClass() );
  +                }
  +            }
  +        }
  +        catch (InterruptedException ie)
           {
  -            handleEvent( elements[i] );
  +            // ignore for now
           }
  +        finally
  +        {
  +            m_mutex.release();
  +        }
  +    }
  +
  +    public final Sink[] getSinks()
  +    {
  +        return m_sinks;
       }
   
  -    public final void handleEvent( QueueElement element )
  +    public final EventHandler getEventHandler()
       {
  +        return m_eventHandler;
       }
   
  -    private final static class Runner implements Runnable
  +    private final static class CommandEventHandler implements EventHandler
       {
  -        private final Queue m_queue;
  +        private final Map    m_signalHandlers;
  +        private final Buffer m_delayedCommands = new VariableSizeBuffer();
  +
  +        protected CommandEventHandler( Map signalHandlers )
  +        {
  +            m_signalHandlers = signalHandlers;
  +        }
   
  -        private Runner( Queue queue )
  +        public final void handleEvents( QueueElement[] elements )
           {
  -            m_queue = queue;
  +            for (int i = 0; i < elements.length; i++)
  +            {
  +                handleEvent( elements[i] );
  +            }
  +
  +            int size = m_delayedCommands.size();
  +            for (int i = 0; i < size; i++)
  +            {
  +                DelayedCommandInfo command = (DelayedCommandInfo) 
m_delayedCommands.remove();
  +
  +                if ( System.currentTimeMillis() >= command.m_nextRunTime )
  +                {
  +                    try
  +                    {
  +                        command.m_command.execute();
  +                    } catch (Exception e)
  +                    {
  +                        // ignore for now
  +                    }
  +
  +                    command.m_numExecutions++;
  +
  +                    if ( command.m_repeatable )
  +                    {
  +                        RepeatedCommand cmd = (RepeatedCommand) 
command.m_command;
  +
  +                        if ( cmd.getNumberOfRepeats() < 
command.m_numExecutions )
  +                        {
  +                            command.m_nextRunTime = 
System.currentTimeMillis() +
  +                                    cmd.getRepeatInterval();
  +                            m_delayedCommands.add( command );
  +                        }
  +                    }
  +                }
  +            }
           }
   
  -        public void run()
  +        public final void handleEvent( QueueElement element )
           {
  +            if ( ! ( element instanceof Signal ) )
  +            {
  +                return;
  +            }
  +
  +            if ( ! ( element instanceof Command ) )
  +            {
  +                ArrayList handlers = (ArrayList) m_signalHandlers.get( 
element.getClass() );
  +
  +                if ( null != handlers )
  +                {
  +                    Iterator i = handlers.iterator();
  +
  +                    while ( i.hasNext() )
  +                    {
  +                        EventHandler handler = (EventHandler) i.next();
  +                        handler.handleEvent( element );
  +                    }
  +                }
  +
  +                return;
  +            }
  +
  +            if ( element instanceof DelayedCommand )
  +            {
  +                DelayedCommandInfo commandInfo = new DelayedCommandInfo();
  +                commandInfo.m_command = (DelayedCommand) element;
  +                commandInfo.m_nextRunTime = System.currentTimeMillis() +
  +                        commandInfo.m_command.getDelayInterval();
  +                commandInfo.m_numExecutions = 0;
  +                commandInfo.m_repeatable = element instanceof 
RepeatedCommand;
  +
  +                m_delayedCommands.add( commandInfo );
  +                return;
  +            }
  +
  +            try
  +            {
  +                ((Command) element).execute();
  +            } catch (Exception e)
  +            {
  +                // ignore for now
  +            }
           }
  +    }
  +
  +    private final static class DelayedCommandInfo
  +    {
  +        protected DelayedCommand m_command;
  +        protected long m_nextRunTime;
  +        protected int m_numExecutions;
  +        protected boolean m_repeatable;
       }
   }
  
  
  
  1.1                  
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/EventPipeline.java
  
  Index: EventPipeline.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.avalon.excalibur.command;
  
  import org.apache.avalon.excalibur.event.Sink;
  import org.apache.avalon.excalibur.event.EventHandler;
  
  /**
   * An EventPipeline is used by the ThreadManager to manage the event Queue and
   * EventHandler relationship.  The ThreadManager manages the automatic 
forwarding
   * of the Events from the queue to the Event Handler.
   *
   * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
   */
  public interface EventPipeline
  {
      /**
       * There can be many different sinks to merge into a pipeline.  For the
       * CommandManager, there is only one sink.
       */
      Sink[] getSinks();
  
      /**
       * Returns the reference to the EventHandler that the events from all the
       * Sinks get merged into.
       */
      EventHandler getEventHandler();
  }
  
  
  1.1                  
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/TPCThreadManager.java
  
  Index: TPCThreadManager.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.avalon.excalibur.command;
  
  import org.apache.avalon.framework.parameters.Parameters;
  import org.apache.avalon.excalibur.concurrent.Mutex;
  import org.apache.avalon.excalibur.thread.*;
  import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
  
  import org.apache.avalon.excalibur.event.Sink;
  import org.apache.avalon.excalibur.event.EventHandler;
  
  import java.util.HashSet;
  import java.util.HashMap;
  import java.util.Iterator;
  
  /**
   * This is a ThreadManager that uses a certain number of threads per 
processor.
   * The number of threads in the pool is a direct proportion to the number of
   * processors.
   *
   * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
   */
  public final class TPCThreadManager implements Runnable
  {
      private final ThreadPool m_threadPool;
      private final Mutex   m_mutex = new Mutex();
      private final HashMap m_pipelines = new HashMap();
      private ThreadControl m_threadControl;
      private       boolean m_done = false;
      private final long    m_sleepTime;
  
      /**
       * The default constructor assumes there is a system property named 
"os.arch.cpus"
       * that has a default for the number of CPUs on a system.  Otherwise, the 
value
       * is 1.
       */
      public TPCThreadManager()
      {
          this( Integer.parseInt( System.getProperty( "os.arch.cpus", "1" ) ) , 
1 );
      }
  
      /**
       * This constructor assumes there is a parameter named "os.arch.cpus"
       * that has a default for the number of CPUs on a system.  Otherwise, the 
value
       * is 1.
       */
      public TPCThreadManager(Parameters params)
      {
          this( params.getParameterAsInteger( "os.arch.cpus", 1 ) , 1 );
      }
  
      /**
       * Constructor provides one thread per number of processors.
       */
      public TPCThreadManager( int numProcessors )
      {
          this( numProcessors, 1 );
      }
  
      /**
       * Constructor provides a specified number of threads per processor.  If
       * either value is less then one, then the value is rewritten as one.
       */
      public TPCThreadManager( int numProcessors, int threadsPerProcessor )
      {
          this( numProcessors, threadsPerProcessor, 1000 );
      }
  
      /**
       * Constructor provides a specified number of threads per processor.  If
       * either value is less then one, then the value is rewritten as one.
       */
      public TPCThreadManager( int numProcessors, int threadsPerProcessor, long 
sleepTime )
      {
          int processors = Math.max( numProcessors, 1 );
          int threads = Math.max( threadsPerProcessor, 1 );
  
          m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager",
                  ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L 
);
  
          m_sleepTime = sleepTime;
          m_threadControl = m_threadPool.execute( this );
      }
  
      /**
       * Register an EventPipeline with the ThreadManager.
       */
      void register( EventPipeline pipeline )
      {
          try
          {
              m_mutex.acquire();
  
              m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
  
              if ( m_done )
              {
                  m_threadControl = m_threadPool.execute( this );
              }
          }
          catch ( InterruptedException ie )
          {
              // ignore for now
          }
          finally
          {
              m_mutex.release();
          }
      }
  
      /**
       * Deregister an EventPipeline with the ThreadManager
       */
      void deregister( EventPipeline pipeline )
      {
          try
          {
              m_mutex.acquire();
  
              m_pipelines.remove( pipeline );
  
              if ( m_pipelines.isEmpty() )
              {
                  m_done = true;
                  m_threadControl.join( 1000 );
              }
          }
          catch ( InterruptedException ie )
          {
              // ignore for now
          }
          finally
          {
              m_mutex.release();
          }
      }
  
      /**
       * Deregisters all EventPipelines from this ThreadManager
       */
      void deregisterAll()
      {
          try
          {
              m_mutex.acquire();
  
              m_done = true;
              m_pipelines.clear();
  
              m_threadControl.join( 1000 );
          }
          catch ( InterruptedException ie )
          {
              // ignore for now
          }
          finally
          {
              m_mutex.release();
          }
      }
  
      public void run()
      {
          while ( ! m_done )
          {
              try
              {
                  m_mutex.acquire();
  
                  Iterator i = m_pipelines.values().iterator();
  
                  while ( i.hasNext() )
                  {
                      m_threadPool.execute( (PipelineRunner) i.next() );
                  }
              }
              catch ( InterruptedException ie )
              {
                  // ignore for now
              }
              finally
              {
                  m_mutex.release();
              }
  
              try
              {
                  Thread.sleep( m_sleepTime );
              }
              catch ( InterruptedException ie )
              {
                 // ignore and continue processing
              }
          }
      }
  
      public final static class PipelineRunner implements Runnable
      {
          private final EventPipeline m_pipeline;
  
          protected PipelineRunner( EventPipeline pipeline )
          {
              m_pipeline = pipeline;
          }
  
          public void run()
          {
              Sink[] sinks = m_pipeline.getSinks();
              EventHandler handler = m_pipeline.getEventHandler();
  
              for (int i = 0; i < sinks.length; i++)
              {
                  handler.handleEvents( sinks[i].dequeueAll() );
              }
          }
      }
  }
  
  
  1.1                  
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/TPSPThreadManager.java
  
  Index: TPSPThreadManager.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.avalon.excalibur.command;
  
  import org.apache.avalon.framework.parameters.Parameters;
  import org.apache.avalon.excalibur.concurrent.Mutex;
  import org.apache.avalon.excalibur.thread.*;
  import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
  
  import org.apache.avalon.excalibur.event.Sink;
  import org.apache.avalon.excalibur.event.EventHandler;
  
  import java.util.HashSet;
  import java.util.HashMap;
  import java.util.Iterator;
  
  /**
   * This is a ThreadManager which provides a threadpool per Sink per 
EventPipeline.
   *
   * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
   */
  public final class TPSPThreadManager implements Runnable
  {
      private final ThreadPool m_threadPool;
      private final Mutex   m_mutex = new Mutex();
      private final HashMap m_pipelines = new HashMap();
      private ThreadControl m_threadControl;
      private       boolean m_done = false;
      private final long    m_sleepTime;
  
      /**
       * The default constructor assumes there is a system property named 
"os.arch.cpus"
       * that has a default for the number of CPUs on a system.  Otherwise, the 
value
       * is 1.
       */
      public TPSPThreadManager()
      {
          this ( 1, 1, 1000 );
      }
  
      /**
       * Constructor provides a specified number of threads per processor.  If
       * either value is less then one, then the value is rewritten as one.
       */
      public TPSPThreadManager( int numProcessors, int threadsPerProcessor, 
long sleepTime )
      {
          int processors = Math.max( numProcessors, 1 );
          int threads = Math.max( threadsPerProcessor, 1 );
  
          m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager",
                  ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L 
);
  
          m_sleepTime = sleepTime;
          m_threadControl = m_threadPool.execute( this );
      }
  
      /**
       * Register an EventPipeline with the ThreadManager.
       */
      void register( EventPipeline pipeline )
      {
          try
          {
              m_mutex.acquire();
  
              m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
  
              if ( m_done )
              {
                  m_threadControl = m_threadPool.execute( this );
              }
          }
          catch ( InterruptedException ie )
          {
              // ignore for now
          }
          finally
          {
              m_mutex.release();
          }
      }
  
      /**
       * Deregister an EventPipeline with the ThreadManager
       */
      void deregister( EventPipeline pipeline )
      {
          try
          {
              m_mutex.acquire();
  
              m_pipelines.remove( pipeline );
  
              if ( m_pipelines.isEmpty() )
              {
                  m_done = true;
                  m_threadControl.join( 1000 );
              }
          }
          catch ( InterruptedException ie )
          {
              // ignore for now
          }
          finally
          {
              m_mutex.release();
          }
      }
  
      /**
       * Deregisters all EventPipelines from this ThreadManager
       */
      void deregisterAll()
      {
          try
          {
              m_mutex.acquire();
  
              m_done = true;
              m_pipelines.clear();
  
              m_threadControl.join( 1000 );
          }
          catch ( InterruptedException ie )
          {
              // ignore for now
          }
          finally
          {
              m_mutex.release();
          }
      }
  
      public void run()
      {
          while ( ! m_done )
          {
              try
              {
                  m_mutex.acquire();
  
                  Iterator i = m_pipelines.values().iterator();
  
                  while ( i.hasNext() )
                  {
                      m_threadPool.execute( (PipelineRunner) i.next() );
                  }
              }
              catch ( InterruptedException ie )
              {
                  // ignore for now
              }
              finally
              {
                  m_mutex.release();
              }
  
              try
              {
                  Thread.sleep( m_sleepTime );
              }
              catch ( InterruptedException ie )
              {
                 // ignore and continue processing
              }
          }
      }
  
      public final static class PipelineRunner implements Runnable
      {
          private final EventPipeline m_pipeline;
  
          protected PipelineRunner( EventPipeline pipeline )
          {
              m_pipeline = pipeline;
          }
  
          public void run()
          {
              Sink[] sinks = m_pipeline.getSinks();
              EventHandler handler = m_pipeline.getEventHandler();
  
              for (int i = 0; i < sinks.length; i++)
              {
                  handler.handleEvents( sinks[i].dequeueAll() );
              }
          }
      }
  }
  
  
  1.1                  
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/ThreadManager.java
  
  Index: ThreadManager.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.avalon.excalibur.command;
  
  /**
   * A ThreadManager handles the thread policies for EventPipelines.  It works
   * hand in hand with the CommandManager, and can be expanded to work with a
   * SEDA like architecture.
   *
   * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
   */
  public interface ThreadManager
  {
      /**
       * Register an EventPipeline with the ThreadManager.
       */
      void register( EventPipeline pipeline );
  
      /**
       * Deregister an EventPipeline with the ThreadManager
       */
      void deregister( EventPipeline pipeline );
  
      /**
       * Deregisters all EventPipelines from this ThreadManager
       */
      void deregisterAll();
  }
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to