User: ejort   
  Date: 02/01/18 12:32:42

  Modified:    src/main/javax/management/timer Timer.java
  Log:
  Thread Pooling of Services
  
  Revision  Changes    Path
  1.4       +210 -108  jmx/src/main/javax/management/timer/Timer.java
  
  Index: Timer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jmx/src/main/javax/management/timer/Timer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- Timer.java        2001/12/17 23:46:06     1.3
  +++ Timer.java        2002/01/18 20:32:42     1.4
  @@ -9,8 +9,7 @@
   import java.io.Serializable;
   
   import java.util.Date;
  -import java.util.Enumeration;
  -import java.util.Hashtable;
  +import java.util.Iterator;
   import java.util.Vector;
   
   import javax.management.InstanceNotFoundException;
  @@ -19,13 +18,15 @@
   import javax.management.NotificationBroadcasterSupport;
   import javax.management.ObjectName;
   
  -// REVIEW: Check synchronization
  +import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
   
  +import org.jboss.mx.util.ThreadPool;
  +
   /**
    * The timer service.
    *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Adrian Brock</a>
  - * @version $Revision: 1.3 $
  + * @version $Revision: 1.4 $
    */
   public class Timer
     extends NotificationBroadcasterSupport
  @@ -97,8 +98,18 @@
   
     /**
      * The registered notifications.
  +   */
  +  ConcurrentReaderHashMap notifications = new ConcurrentReaderHashMap();
  +
  +  /**
  +   * The controlling thread
  +   */
  +  private Controller controller = new Controller();
  +
  +  /**
  +   * The date next run of the notifications, zero when nothing to run.
      */
  -  Hashtable notifications = new Hashtable();
  +  long nextRunDate = 0;
   
     // Static --------------------------------------------------------
   
  @@ -122,8 +133,7 @@
       return addNotification(type, message, userData, date, period, 0);
     }
   
  -  // REVIEW: Synchronized because of active/thread start
  -  public synchronized Integer addNotification(String type, String message,
  +  public Integer addNotification(String type, String message,
                   Object userData, Date date, long period, long occurences)
       throws IllegalArgumentException
     {
  @@ -143,9 +153,8 @@
       // Add the registration.
       notifications.put(id, rn);
   
  -    // If we are active, start the thread
  -    if (active)
  -      rn.start();
  +    // Work out if this is the next to run
  +    reschedule(rn, true);
   
       return id;
     }
  @@ -158,7 +167,7 @@
     public Date getDate(Integer id)
     {
       // Make sure there is a registration
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         return null;
   
  @@ -174,7 +183,7 @@
     public Long getNbOccurences(Integer id)
     {
       // Make sure there is a registration
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         return null;
   
  @@ -184,34 +193,35 @@
   
     public Vector getNotificationIDs(String type)
     {
  -    RegisteredNotification rn = null;
       Vector result = new Vector();
   
       // Loop through the notifications looking for the passed type.
  -    for (Enumeration e = notifications.elements(); e.hasMoreElements();)
  +    Iterator iterator = notifications.values().iterator();
  +    while (iterator.hasNext())
       {
  -      rn = (RegisteredNotification) e.nextElement();
  +      RegisteredNotification rn = (RegisteredNotification) iterator.next();
         if (rn.type.equals(type))
           result.add(rn.id);
       }
  +      
       return result;
     }
   
     public String getNotificationMessage(Integer id)
     {
       // Make sure there is a registration
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         return null;
   
  -    // Return the message.
  +    // Return the message
       return rn.message;
     }
   
     public String getNotificationType(Integer id)
     {
       // Make sure there is a registration
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         return null;
   
  @@ -222,7 +232,7 @@
     public Object getNotificationUserData(Integer id)
     {
       // Make sure there is a registration
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         return null;
   
  @@ -233,7 +243,7 @@
     public Long getPeriod(Integer id)
     {
       // Make sure there is a registration
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         return null;
   
  @@ -256,70 +266,49 @@
       return notifications.isEmpty();
     }
   
  -  // REVIEW: Synchronized because of active/thread stop and enumeration
  -  public synchronized void removeAllNotifications()
  +  public void removeAllNotifications()
     {
  -    RegisteredNotification rn = null;
  +    // Remove the notifications
  +    notifications.clear();
   
  -    // Loop through the notifications removing everything.
  -    for (Enumeration e = notifications.elements(); e.hasMoreElements();)
  +    // The spec says to reset the identifiers, seems like a bad idea to me
  +    synchronized (this)
       {
  -      rn = (RegisteredNotification) e.nextElement();
  -      notifications.remove(rn.id);
  -
  -      // Stop the thread
  -      rn.nextDate = 0;
  -      if (active)
  -        rn.interrupt();
  +       nextId = 0;
       }
  -
  -    // The spec says to reset the identifiers, seems like a bad idea to me
  -    nextId = 0;
     }
   
  -  // REVIEW: Synchronized because of active/thread stop and enumeration
  -  public synchronized void removeNotification(Integer id)
  +  public void removeNotification(Integer id)
       throws InstanceNotFoundException
     {
       // Check if there is a notification.
  -    RegisteredNotification rn = (RegisteredNotification)notifications.get(id);
  +    RegisteredNotification rn = (RegisteredNotification) notifications.get(id);
       if (rn == null)
         throw new InstanceNotFoundException("No notification id : " +
                                             id.toString());
   
       // Remove the notification
       notifications.remove(id);
  -
  -    // Stop the thread
  -    rn.nextDate = 0;
  -    if (active)
  -      rn.interrupt();
     }
   
  -  // REVIEW: Synchronized because of active/thread stop and enumeration
  -  public synchronized void removeNotifications(String type)
  +  public void removeNotifications(String type)
       throws InstanceNotFoundException
     {
  -    RegisteredNotification rn = null;
       boolean found = false;
   
       // Loop through the notifications removing the passed type.
  -    for (Enumeration e = notifications.elements(); e.hasMoreElements();)
  +    Iterator iterator = notifications.values().iterator();
  +    while (iterator.hasNext())
       {
  -      rn = (RegisteredNotification) e.nextElement();
  +      RegisteredNotification rn = (RegisteredNotification) iterator.next();
         if (rn.type.equals(type))
         {
  -        notifications.remove(rn.id);
  +        iterator.remove();
           found = true;
  -
  -        // Stop the thread
  -        rn.nextDate = 0;
  -        if (active)
  -          rn.interrupt();
         }
       }
   
  -    // The spec says to through an error when nothing removed.
  +    // The spec says to through an exception when nothing removed.
       if (found == false)
         throw new InstanceNotFoundException("Nothing registered for type: " +
                                             type);
  @@ -330,36 +319,40 @@
       sendPastNotifications = value;
     }
   
  -  // REVIEW: Synchronized because of active/thread stop and enumeration
     public synchronized void start()
     {
       // Ignore if already active
  -    if (active)
  +    if (active == true)
         return;
       active = true;
   
  -    // Start the threads for each notification
  -    for (Enumeration e = notifications.elements(); e.hasMoreElements();)
  +    // Perform the initial sends, for past notifications send missed events
  +    // otherwise ignore them
  +    Iterator iterator = notifications.values().iterator();
  +    while (iterator.hasNext())
       {
  -      ((Thread) e.nextElement()).start();
  +      RegisteredNotification rn = (RegisteredNotification) iterator.next();
  +      if (sendPastNotifications)
  +        rn.sendType = SEND_START;
  +      else
  +        rn.sendType = SEND_NO;
  +      sendNotifications(rn);
  +      rn.sendType = SEND_NORMAL;
       }
  +
  +    // Start the controlling thread
  +    controller.start();
     }
   
  -  // REVIEW: Synchronized because of active/thread stop and enumeration
     public synchronized void stop()
     {
       // Ignore if not active
  -    if (!active)
  +    if (active == false)
         return;
   
       // Stop the threads
       active = false;
  -
  -    // Ask each thread to stop
  -    for (Enumeration e = notifications.elements(); e.hasMoreElements();)
  -    {
  -      ((Thread) e.nextElement()).interrupt();
  -    }
  +    controller.cancel();
     }
   
     // MBeanRegistrationImplementation overrides ---------------------
  @@ -393,15 +386,51 @@
     // Private -------------------------------------------------------
   
     /**
  +   * Recalculate the next run for inactive notifications.
  +   */
  +  private void reschedule()
  +  {
  +    // No need to reschedule when not active
  +    if (isActive() == false)
  +      return;
  +
  +    nextRunDate = 0;
  +
  +    // Loop through the registered notifications to find the next run.
  +    Iterator iterator = notifications.values().iterator();
  +    while (iterator.hasNext())
  +    {
  +      RegisteredNotification rn = (RegisteredNotification) iterator.next();
  +      if (rn.running == false)
  +         reschedule(rn, false);
  +    }
  +  }
  +
  +  /**
  +   * See whether this notification is the next one.
  +   *
  +   * @param rn the notification to check
  +   * @param notify whether to notify the controller of changes
  +   */
  +  private void reschedule(RegisteredNotification rn, boolean notify)
  +  {
  +    synchronized (this)
  +    {
  +      if (nextRunDate == 0 || rn.nextDate < nextRunDate)
  +      {
  +        nextRunDate = rn.nextDate;
  +        if (notify == true)
  +          controller.interrupt();
  +      }
  +    }
  +  }
  +
  +  /**
      * Send any outstanding notifications.
      *
      * @param rn the registered notification to send.
  -   * @param doSend whether the send operations should be performed.
  -   *        Pass SEND_NO for startup and not sending past notification,
  -   *        pass SEND_START for startup and sending past notifications,
  -   *        pass SEND_NORMAL for normal operation.
      */
  -  private void sendNotifications(RegisteredNotification rn, int doSend)
  +  private void sendNotifications(RegisteredNotification rn)
     {
       // Keep going until we have done all outstanding notifications.
       // The loop ends when not active, or there are no outstanding
  @@ -414,7 +443,7 @@
       {
         // Do we actually send it?
         // Yes, unless start and not sending past notifications.
  -      if (doSend != SEND_NO)
  +      if (rn.sendType != SEND_NO)
         {
           long seq = 0;
           synchronized (this)
  @@ -422,23 +451,29 @@
             seq = ++sequenceNumber;
           }
           sendNotification(new TimerNotification(rn.type, this, seq,
  -                         System.currentTimeMillis(), rn.message, rn.id));
  +                         rn.nextDate, rn.message, rn.id));
         }
   
         // Calculate the next date.
  -      // Except for we are sending past notifications at start up,
  +      // Except for when we are sending past notifications at start up,
         // it cannot be in the future.
         do
         {
           // If no next run, remove it sets the next date to zero.
  -        if (!rn.calcNextDate())
  +        if (rn.calcNextDate() == false)
           {
             notifications.remove(rn.id);
           }
         }
  -      while (isActive() && doSend != SEND_START && rn.nextDate != 0
  +      while (isActive() == true && rn.sendType != SEND_START && rn.nextDate != 0
                && rn.nextDate < System.currentTimeMillis());
       }
  +
  +    // We've finished with this notification
  +    rn.running = false;
  +
  +    // Work out if this is the next to run
  +    reschedule(rn, true);
     }
   
     // Inner classes -------------------------------------------------
  @@ -447,7 +482,7 @@
      * A registered notification. These run as separate threads.
      */
     private class RegisteredNotification
  -    extends Thread
  +    implements Runnable
     {
       // Attributes ----------------------------------------------------
   
  @@ -491,6 +526,16 @@
        */
       public long nextDate;
   
  +    /**
  +     * The send type, no send, past notifications or normal
  +     */
  +    public int sendType = SEND_NORMAL;
  +
  +    /**
  +     * Whether we are already in a notification
  +     */
  +    public boolean running = false;
  +
       // Constructors --------------------------------------------------
   
       /**
  @@ -537,10 +582,9 @@
         while (nextDate < System.currentTimeMillis())
         {
           // If we have no more occurences its an error
  -        if (!calcNextDate())
  +        if (calcNextDate() == false)
             throw new IllegalArgumentException("Requested notification(s) " +
                                                "in the past.");
  -
         }
       }
   
  @@ -574,48 +618,106 @@
         return true;
       }
   
  -    // Thread overrides ----------------------------------------------
  +    // Runnable overrides -------------------------------------------
   
       /**
  -     * Wait until the next run date or until we are interrupted by
  -     * a remove or stop. When active, send any outstanding notifications.
  -     *
  +     * Send the notifications.
        */
       public void run()
       {
  -      // Work out the startup send mode
  -      int doSend = SEND_NO;
  -      if (sendPastNotifications)
  -        doSend = SEND_START;
  -
  -      // Keep going until we are stopped
  -      while (isActive() && nextDate != 0)
  -      {
           // Send any notifications
  -        sendNotifications(this, doSend);
  +        sendNotifications(this);
  +    }
  +  }
   
  -        // We have done the initial send, switch to normal mode
  -        doSend = SEND_NORMAL;
  +  /**
  +   * The controlling thread.
  +   */
  +  private class Controller
  +    extends Thread
  +  {
  +    // Attributes ----------------------------------------------------
   
  -        // Wait until we the next notification
  -        doWait();
  -      }
  +    /**
  +     * The thread pool used to obtain threads to send the notifications.
  +     */
  +    ThreadPool threadPool;
  +  
  +    // Constructors --------------------------------------------------
  +
  +    /**
  +     * This controller runs in a different thread as a daemon.
  +     * It has a thread pool to run notifications.
  +     */
  +    public Controller()
  +    {
  +       super();
  +       setDaemon(true);
  +       threadPool = new ThreadPool();
       }
   
  +    // Public --------------------------------------------------------
  +
  +    /**
  +     * Start the thread pool, before starting this thread
  +     */
  +    public void start()
  +    {
  +      threadPool.setActive(true);
  +      super.start();
  +    }
  +
  +    /**
  +     * Stop the thread pool, don't stop the thread
  +     */
  +    public void cancel()
  +    {
  +      threadPool.setActive(false);
  +    }
  +
       /**
        * Wait until the next run date or until we are interrupted by
  -     * remove or stop.
  +     * a reschedule. When active, check the registered notifications
  +     * to see whether notifications should be sent.
        */
  -    private synchronized void doWait()
  +    public void run()
       {
  -      try
  -      {
  -        long waitMillis = nextDate - System.currentTimeMillis();
  -        if (waitMillis > 0)
  -          wait(waitMillis);
  -      }
  -      catch (InterruptedException ignored)
  +      // Keep going until we are stopped
  +      while (isActive())
         {
  +        Iterator iterator = notifications.values().iterator();
  +        while (iterator.hasNext())
  +        {
  +          RegisteredNotification rn = (RegisteredNotification) iterator.next();
  +
  +          // Notification required
  +          if (isActive() && rn.running == false 
  +              && rn.nextDate <= System.currentTimeMillis())
  +          {
  +            rn.running = true;
  +            threadPool.run(rn);
  +          }
  +        }
  +
  +        // Reschedule the inactive notifications
  +        reschedule();
  +
  +        // Wait until the next run or we are interrupted
  +        if (isActive())
  +        {
  +          try
  +          {
  +            if (nextRunDate == 0)
  +              sleep(0);
  +            else
  +            {
  +              long waitMillis = nextRunDate - System.currentTimeMillis();
  +              if (waitMillis > 0)
  +                sleep(waitMillis);
  +            }
  +          }
  +          catch (InterruptedException ignored) {}
  +        }
         }
       }
     }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to