User: ejort Date: 02/01/18 12:32:42 Modified: src/main/javax/management/timer Timer.java Log: Thread Pooling of Services Revision Changes Path 1.4 +210 -108 jmx/src/main/javax/management/timer/Timer.java Index: Timer.java =================================================================== RCS file: /cvsroot/jboss/jmx/src/main/javax/management/timer/Timer.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- Timer.java 2001/12/17 23:46:06 1.3 +++ Timer.java 2002/01/18 20:32:42 1.4 @@ -9,8 +9,7 @@ import java.io.Serializable; import java.util.Date; -import java.util.Enumeration; -import java.util.Hashtable; +import java.util.Iterator; import java.util.Vector; import javax.management.InstanceNotFoundException; @@ -19,13 +18,15 @@ import javax.management.NotificationBroadcasterSupport; import javax.management.ObjectName; -// REVIEW: Check synchronization +import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; +import org.jboss.mx.util.ThreadPool; + /** * The timer service. * * @author <a href="mailto:[EMAIL PROTECTED]">Adrian Brock</a> - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class Timer extends NotificationBroadcasterSupport @@ -97,8 +98,18 @@ /** * The registered notifications. + */ + ConcurrentReaderHashMap notifications = new ConcurrentReaderHashMap(); + + /** + * The controlling thread + */ + private Controller controller = new Controller(); + + /** + * The date next run of the notifications, zero when nothing to run. */ - Hashtable notifications = new Hashtable(); + long nextRunDate = 0; // Static -------------------------------------------------------- @@ -122,8 +133,7 @@ return addNotification(type, message, userData, date, period, 0); } - // REVIEW: Synchronized because of active/thread start - public synchronized Integer addNotification(String type, String message, + public Integer addNotification(String type, String message, Object userData, Date date, long period, long occurences) throws IllegalArgumentException { @@ -143,9 +153,8 @@ // Add the registration. notifications.put(id, rn); - // If we are active, start the thread - if (active) - rn.start(); + // Work out if this is the next to run + reschedule(rn, true); return id; } @@ -158,7 +167,7 @@ public Date getDate(Integer id) { // Make sure there is a registration - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) return null; @@ -174,7 +183,7 @@ public Long getNbOccurences(Integer id) { // Make sure there is a registration - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) return null; @@ -184,34 +193,35 @@ public Vector getNotificationIDs(String type) { - RegisteredNotification rn = null; Vector result = new Vector(); // Loop through the notifications looking for the passed type. - for (Enumeration e = notifications.elements(); e.hasMoreElements();) + Iterator iterator = notifications.values().iterator(); + while (iterator.hasNext()) { - rn = (RegisteredNotification) e.nextElement(); + RegisteredNotification rn = (RegisteredNotification) iterator.next(); if (rn.type.equals(type)) result.add(rn.id); } + return result; } public String getNotificationMessage(Integer id) { // Make sure there is a registration - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) return null; - // Return the message. + // Return the message return rn.message; } public String getNotificationType(Integer id) { // Make sure there is a registration - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) return null; @@ -222,7 +232,7 @@ public Object getNotificationUserData(Integer id) { // Make sure there is a registration - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) return null; @@ -233,7 +243,7 @@ public Long getPeriod(Integer id) { // Make sure there is a registration - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) return null; @@ -256,70 +266,49 @@ return notifications.isEmpty(); } - // REVIEW: Synchronized because of active/thread stop and enumeration - public synchronized void removeAllNotifications() + public void removeAllNotifications() { - RegisteredNotification rn = null; + // Remove the notifications + notifications.clear(); - // Loop through the notifications removing everything. - for (Enumeration e = notifications.elements(); e.hasMoreElements();) + // The spec says to reset the identifiers, seems like a bad idea to me + synchronized (this) { - rn = (RegisteredNotification) e.nextElement(); - notifications.remove(rn.id); - - // Stop the thread - rn.nextDate = 0; - if (active) - rn.interrupt(); + nextId = 0; } - - // The spec says to reset the identifiers, seems like a bad idea to me - nextId = 0; } - // REVIEW: Synchronized because of active/thread stop and enumeration - public synchronized void removeNotification(Integer id) + public void removeNotification(Integer id) throws InstanceNotFoundException { // Check if there is a notification. - RegisteredNotification rn = (RegisteredNotification)notifications.get(id); + RegisteredNotification rn = (RegisteredNotification) notifications.get(id); if (rn == null) throw new InstanceNotFoundException("No notification id : " + id.toString()); // Remove the notification notifications.remove(id); - - // Stop the thread - rn.nextDate = 0; - if (active) - rn.interrupt(); } - // REVIEW: Synchronized because of active/thread stop and enumeration - public synchronized void removeNotifications(String type) + public void removeNotifications(String type) throws InstanceNotFoundException { - RegisteredNotification rn = null; boolean found = false; // Loop through the notifications removing the passed type. - for (Enumeration e = notifications.elements(); e.hasMoreElements();) + Iterator iterator = notifications.values().iterator(); + while (iterator.hasNext()) { - rn = (RegisteredNotification) e.nextElement(); + RegisteredNotification rn = (RegisteredNotification) iterator.next(); if (rn.type.equals(type)) { - notifications.remove(rn.id); + iterator.remove(); found = true; - - // Stop the thread - rn.nextDate = 0; - if (active) - rn.interrupt(); } } - // The spec says to through an error when nothing removed. + // The spec says to through an exception when nothing removed. if (found == false) throw new InstanceNotFoundException("Nothing registered for type: " + type); @@ -330,36 +319,40 @@ sendPastNotifications = value; } - // REVIEW: Synchronized because of active/thread stop and enumeration public synchronized void start() { // Ignore if already active - if (active) + if (active == true) return; active = true; - // Start the threads for each notification - for (Enumeration e = notifications.elements(); e.hasMoreElements();) + // Perform the initial sends, for past notifications send missed events + // otherwise ignore them + Iterator iterator = notifications.values().iterator(); + while (iterator.hasNext()) { - ((Thread) e.nextElement()).start(); + RegisteredNotification rn = (RegisteredNotification) iterator.next(); + if (sendPastNotifications) + rn.sendType = SEND_START; + else + rn.sendType = SEND_NO; + sendNotifications(rn); + rn.sendType = SEND_NORMAL; } + + // Start the controlling thread + controller.start(); } - // REVIEW: Synchronized because of active/thread stop and enumeration public synchronized void stop() { // Ignore if not active - if (!active) + if (active == false) return; // Stop the threads active = false; - - // Ask each thread to stop - for (Enumeration e = notifications.elements(); e.hasMoreElements();) - { - ((Thread) e.nextElement()).interrupt(); - } + controller.cancel(); } // MBeanRegistrationImplementation overrides --------------------- @@ -393,15 +386,51 @@ // Private ------------------------------------------------------- /** + * Recalculate the next run for inactive notifications. + */ + private void reschedule() + { + // No need to reschedule when not active + if (isActive() == false) + return; + + nextRunDate = 0; + + // Loop through the registered notifications to find the next run. + Iterator iterator = notifications.values().iterator(); + while (iterator.hasNext()) + { + RegisteredNotification rn = (RegisteredNotification) iterator.next(); + if (rn.running == false) + reschedule(rn, false); + } + } + + /** + * See whether this notification is the next one. + * + * @param rn the notification to check + * @param notify whether to notify the controller of changes + */ + private void reschedule(RegisteredNotification rn, boolean notify) + { + synchronized (this) + { + if (nextRunDate == 0 || rn.nextDate < nextRunDate) + { + nextRunDate = rn.nextDate; + if (notify == true) + controller.interrupt(); + } + } + } + + /** * Send any outstanding notifications. * * @param rn the registered notification to send. - * @param doSend whether the send operations should be performed. - * Pass SEND_NO for startup and not sending past notification, - * pass SEND_START for startup and sending past notifications, - * pass SEND_NORMAL for normal operation. */ - private void sendNotifications(RegisteredNotification rn, int doSend) + private void sendNotifications(RegisteredNotification rn) { // Keep going until we have done all outstanding notifications. // The loop ends when not active, or there are no outstanding @@ -414,7 +443,7 @@ { // Do we actually send it? // Yes, unless start and not sending past notifications. - if (doSend != SEND_NO) + if (rn.sendType != SEND_NO) { long seq = 0; synchronized (this) @@ -422,23 +451,29 @@ seq = ++sequenceNumber; } sendNotification(new TimerNotification(rn.type, this, seq, - System.currentTimeMillis(), rn.message, rn.id)); + rn.nextDate, rn.message, rn.id)); } // Calculate the next date. - // Except for we are sending past notifications at start up, + // Except for when we are sending past notifications at start up, // it cannot be in the future. do { // If no next run, remove it sets the next date to zero. - if (!rn.calcNextDate()) + if (rn.calcNextDate() == false) { notifications.remove(rn.id); } } - while (isActive() && doSend != SEND_START && rn.nextDate != 0 + while (isActive() == true && rn.sendType != SEND_START && rn.nextDate != 0 && rn.nextDate < System.currentTimeMillis()); } + + // We've finished with this notification + rn.running = false; + + // Work out if this is the next to run + reschedule(rn, true); } // Inner classes ------------------------------------------------- @@ -447,7 +482,7 @@ * A registered notification. These run as separate threads. */ private class RegisteredNotification - extends Thread + implements Runnable { // Attributes ---------------------------------------------------- @@ -491,6 +526,16 @@ */ public long nextDate; + /** + * The send type, no send, past notifications or normal + */ + public int sendType = SEND_NORMAL; + + /** + * Whether we are already in a notification + */ + public boolean running = false; + // Constructors -------------------------------------------------- /** @@ -537,10 +582,9 @@ while (nextDate < System.currentTimeMillis()) { // If we have no more occurences its an error - if (!calcNextDate()) + if (calcNextDate() == false) throw new IllegalArgumentException("Requested notification(s) " + "in the past."); - } } @@ -574,48 +618,106 @@ return true; } - // Thread overrides ---------------------------------------------- + // Runnable overrides ------------------------------------------- /** - * Wait until the next run date or until we are interrupted by - * a remove or stop. When active, send any outstanding notifications. - * + * Send the notifications. */ public void run() { - // Work out the startup send mode - int doSend = SEND_NO; - if (sendPastNotifications) - doSend = SEND_START; - - // Keep going until we are stopped - while (isActive() && nextDate != 0) - { // Send any notifications - sendNotifications(this, doSend); + sendNotifications(this); + } + } - // We have done the initial send, switch to normal mode - doSend = SEND_NORMAL; + /** + * The controlling thread. + */ + private class Controller + extends Thread + { + // Attributes ---------------------------------------------------- - // Wait until we the next notification - doWait(); - } + /** + * The thread pool used to obtain threads to send the notifications. + */ + ThreadPool threadPool; + + // Constructors -------------------------------------------------- + + /** + * This controller runs in a different thread as a daemon. + * It has a thread pool to run notifications. + */ + public Controller() + { + super(); + setDaemon(true); + threadPool = new ThreadPool(); } + // Public -------------------------------------------------------- + + /** + * Start the thread pool, before starting this thread + */ + public void start() + { + threadPool.setActive(true); + super.start(); + } + + /** + * Stop the thread pool, don't stop the thread + */ + public void cancel() + { + threadPool.setActive(false); + } + /** * Wait until the next run date or until we are interrupted by - * remove or stop. + * a reschedule. When active, check the registered notifications + * to see whether notifications should be sent. */ - private synchronized void doWait() + public void run() { - try - { - long waitMillis = nextDate - System.currentTimeMillis(); - if (waitMillis > 0) - wait(waitMillis); - } - catch (InterruptedException ignored) + // Keep going until we are stopped + while (isActive()) { + Iterator iterator = notifications.values().iterator(); + while (iterator.hasNext()) + { + RegisteredNotification rn = (RegisteredNotification) iterator.next(); + + // Notification required + if (isActive() && rn.running == false + && rn.nextDate <= System.currentTimeMillis()) + { + rn.running = true; + threadPool.run(rn); + } + } + + // Reschedule the inactive notifications + reschedule(); + + // Wait until the next run or we are interrupted + if (isActive()) + { + try + { + if (nextRunDate == 0) + sleep(0); + else + { + long waitMillis = nextRunDate - System.currentTimeMillis(); + if (waitMillis > 0) + sleep(waitMillis); + } + } + catch (InterruptedException ignored) {} + } } } }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development