User: ejort Date: 02/01/18 12:32:42 Modified: src/main/javax/management/monitor Monitor.java Log: Thread Pooling of Services Revision Changes Path 1.2 +614 -460 jmx/src/main/javax/management/monitor/Monitor.java Index: Monitor.java =================================================================== RCS file: /cvsroot/jboss/jmx/src/main/javax/management/monitor/Monitor.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- Monitor.java 2001/12/21 01:39:37 1.1 +++ Monitor.java 2002/01/18 20:32:42 1.2 @@ -1,460 +1,614 @@ -/* -* JBoss, the OpenSource EJB server -* -* Distributable under LGPL license. -* See terms of license at gnu.org. -*/ -package javax.management.monitor; - -import java.io.Serializable; - -import javax.management.InstanceNotFoundException; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanInfo; -import javax.management.MBeanRegistration; -import javax.management.MBeanServer; -import javax.management.NotificationBroadcasterSupport; -import javax.management.ObjectName; - -// REVIEW: Check synchronization - -/** - * The monitor service. - * - * @author <a href="mailto:[EMAIL PROTECTED]">Adrian Brock</a> - * @version $Revision: 1.1 $ - * - */ -public abstract class Monitor - extends NotificationBroadcasterSupport - implements MonitorMBean, MBeanRegistration, Serializable -{ - // Constants ----------------------------------------------------- - - /** - * Used to reset errors in {@link #alreadyNotified}. - * REVIEW: Check - */ - protected final int RESET_FLAGS_ALREADY_NOTIFIED = 0; - - /** - * An observed attribute type error has been notified. - * REVIEW: Check - */ - protected final int RUNTIME_ERROR_NOTIFIED = 1; - - /** - * An observed object error has been notified. - * REVIEW: Check - */ - protected final int OBSERVED_OBJECT_ERROR_NOTIFIED = 2; - - /** - * An observed attribute error has been notified. - * REVIEW: Check - */ - protected final int OBSERVED_ATTRIBUTE_ERROR_NOTIFIED = 4; - - /** - * An observed attribute type error has been notified. - * REVIEW: Check - */ - protected final int OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED = 8; - - // Attributes ---------------------------------------------------- - - /** - * The granularity period. - */ - long granularityPeriod = 10000; - - /** - * The observed attribute. - */ - String observedAttribute = null; - - /** - * The observed object. - */ - ObjectName observedObject = null; - - /** - * Whether the service is active. - */ - boolean active = false; - - /** - * The server this service is registered in. - */ - protected MBeanServer server; - - /** - * The errors that have already been notified. - * REVIEW: Check - */ - protected int alreadyNotified = 0; - - /** - * ????. - * REVIEW: Implement - */ - protected String dbgTag = null; - - /** - * The monitor thread for this monitor. - */ - private MonitorThread monitorThread; - - /** - * The notification sequence number. - */ - private long sequenceNumber; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - /** - * Construct a new monitor with the default values. - */ - public Monitor() - { - monitorThread = new MonitorThread(this); - } - - // Public -------------------------------------------------------- - - // MonitorMBean implementation ----------------------------------- - - public long getGranularityPeriod() - { - return granularityPeriod; - } - - public String getObservedAttribute() - { - return observedAttribute; - } - - public ObjectName getObservedObject() - { - return observedObject; - } - - public boolean isActive() - { - return active; - } - public void setGranularityPeriod(long period) - throws IllegalArgumentException - { - if (period <= 0) - throw new IllegalArgumentException("Period must be positive."); - granularityPeriod = period; - } - - public void setObservedAttribute(String attribute) - { - observedAttribute = attribute; - // REVIEW: not find grained enough? - alreadyNotified = RESET_FLAGS_ALREADY_NOTIFIED; - } - - public void setObservedObject(ObjectName object) - { - observedObject = object; - // REVIEW: not find grained enough? - alreadyNotified = RESET_FLAGS_ALREADY_NOTIFIED; - } - - // REVIEW: Synchronized because of active/thread stop and enumeration - public synchronized void start() - { - // Ignore if already active - if (active) - return; - active = true; - - // Start the monitor thread - monitorThread.start(); - } - - // REVIEW: Synchronized because of active/thread stop and enumeration - public synchronized void stop() - { - // Ignore if not active - if (!active) - return; - - // Stop the monitor thread - active = false; - monitorThread.interrupt(); - } - - // MBeanRegistrationImplementation overrides --------------------- - - public ObjectName preRegister(MBeanServer server, ObjectName objectName) - throws Exception - { - // Remember the server. - this.server = server; - - // Use the passed object name. - return objectName; - } - - public void postRegister(Boolean registrationDone) - { - } - - public void preDeregister() - throws Exception - { - // Stop the monitor before deregistration. - stop(); - } - - public void postDeregister() - { - } - - // Package protected --------------------------------------------- - - /** - * Run the monitor.<p> - * - * Retrieves the monitored attribute and passes it to each service.<p> - * - * Peforms the common error processing. - * REVIEW: Use the internal interface? - */ - void runMonitor() - { - // Monitor for uncaught errors - try - { - MBeanInfo mbeanInfo = null; - try - { - mbeanInfo = server.getMBeanInfo(observedObject); - } - catch (InstanceNotFoundException e) - { - sendObjectErrorNotification("The observed object is not registered."); - return; - } - - // Get the attribute information - MBeanAttributeInfo[] mbeanAttributeInfo = mbeanInfo.getAttributes(); - MBeanAttributeInfo attributeInfo = null; - for (int i = 0; i < mbeanAttributeInfo.length; i++) - { - if (mbeanAttributeInfo[i].getName().equals(observedAttribute)) - { - attributeInfo = mbeanAttributeInfo[i]; - break; - } - } - - // The attribute must exist - if (attributeInfo == null) - { - sendAttributeErrorNotification( - "The observed attribute does not exist"); - return; - } - // The attribute must exist - if (!attributeInfo.isReadable()) - { - sendAttributeErrorNotification("Attribute not readable."); - return; - } - - // Determine the get method. - String methodName = null; - if (attributeInfo.isIs()) - methodName = "is" + observedAttribute; - else - methodName = "get" + observedAttribute; - - // Get the value - Object value = null; - try - { - value = server.invoke(observedObject, methodName, - new Object[0], new String[0]); - } - catch (InstanceNotFoundException e) - { - sendObjectErrorNotification("The observed object is not registered."); - return; - } - - // Check for null value - if (value == null) - { - sendAttributeTypeErrorNotification("Attribute is null"); - return; - } - - // Now pass the value to the respective monitor. - monitor(attributeInfo, value); - } - // Notify an unexcepted error - catch (Exception e) - { - sendRuntimeErrorNotification("General error: " + e.toString()); - } - } - - /** - * Perform the monitor specific processing. - * - * @param attributeInfo the MBean attribute information. - * @param value the value to monitor. - */ - abstract void monitor(MBeanAttributeInfo attributeInfo, Object value) - throws Exception; - - /** - * Sends the notification - * - * @param type the notification type. - * @param timestamp the time of the notification. - * @param message the human readable message to send. - * @param attribute the attribute name. - * @param gauge the derived gauge. - * @param trigger the trigger value. - */ - void sendNotification(String type, long timestamp, String message, - String attribute, Object gauge, Object trigger) - { - long seq = 0; - synchronized (this) - { - seq = ++sequenceNumber; - } - if (timestamp == 0) - timestamp = System.currentTimeMillis(); - sendNotification(new MonitorNotification(type, this, seq, - timestamp, message, gauge, - attribute, observedObject, trigger)); - } - - /** - * Send a runtime error notification. - * - * @param message the human readable message to send. - */ - void sendRuntimeErrorNotification(String message) - { - if ((alreadyNotified & RUNTIME_ERROR_NOTIFIED) == 0) - sendNotification(MonitorNotification.RUNTIME_ERROR, 0, - message, null, null, null); - alreadyNotified |= RUNTIME_ERROR_NOTIFIED; - } - - /** - * Send an object error notification. - * - * @param message the human readable message to send. - */ - void sendObjectErrorNotification(String message) - { - if ((alreadyNotified & OBSERVED_OBJECT_ERROR_NOTIFIED) == 0) - sendNotification(MonitorNotification.OBSERVED_OBJECT_ERROR, 0, - message, null, null, null); - alreadyNotified |= OBSERVED_OBJECT_ERROR_NOTIFIED; - } - - /** - * Send an attribute error notification. - * - * @param message the human readable message to send. - */ - void sendAttributeErrorNotification(String message) - { - if ((alreadyNotified & OBSERVED_ATTRIBUTE_ERROR_NOTIFIED) == 0) - sendNotification(MonitorNotification.OBSERVED_ATTRIBUTE_ERROR, 0, - message, observedAttribute, null, null); - alreadyNotified |= OBSERVED_ATTRIBUTE_ERROR_NOTIFIED; - } - - /** - * Send an attribute type error notification. - * - * @param message the human readable message to send. - */ - void sendAttributeTypeErrorNotification(String message) - { - if ((alreadyNotified & OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED) == 0) - sendNotification(MonitorNotification.OBSERVED_ATTRIBUTE_TYPE_ERROR, 0, - message, observedAttribute, null, null); - alreadyNotified |= OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED; - } - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - - /** - * A monitor thread. - */ - private class MonitorThread - extends Thread - { - // Attributes ---------------------------------------------------- - - // The monitoring to perform - private Monitor monitor; - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - /** - * Create a monitor thread to periodically perform monitoring. - * - * @param monitor the montioring to perform. - */ - public MonitorThread(Monitor monitor) - { - this.monitor = monitor; - } - - // Thread overrides ---------------------------------------------- - - /** - * Wait until the monitor time or until we are interrupted by - * stop. When active, run the monitor. - */ - public void run() - { - // Keep going until we are stopped - while (isActive()) - { - // Perform the monitoring - monitor.runMonitor(); - - // Wait until the next monitor time - doWait(); - } - } - - /** - * Wait until the next monitor. - */ - private synchronized void doWait() - { - try - { - wait(getGranularityPeriod()); - } - catch (InterruptedException ignored) - { - } - } - } -} +/* +* JBoss, the OpenSource EJB server +* +* Distributable under LGPL license. +* See terms of license at gnu.org. +*/ +package javax.management.monitor; + +import java.io.Serializable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanRegistration; +import javax.management.MBeanServer; +import javax.management.NotificationBroadcasterSupport; +import javax.management.ObjectName; + +import org.jboss.mx.util.ThreadPool; + +/** + * The monitor service. + * + * @author <a href="mailto:[EMAIL PROTECTED]">Adrian Brock</a> + * @version $Revision: 1.2 $ + * + */ +public abstract class Monitor + extends NotificationBroadcasterSupport + implements MonitorMBean, MBeanRegistration, Serializable +{ + // Constants ----------------------------------------------------- + + /** + * Used to reset errors in {@link #alreadyNotified}. + * REVIEW: Check + */ + protected final int RESET_FLAGS_ALREADY_NOTIFIED = 0; + + /** + * An observed attribute type error has been notified. + * REVIEW: Check + */ + protected final int RUNTIME_ERROR_NOTIFIED = 1; + + /** + * An observed object error has been notified. + * REVIEW: Check + */ + protected final int OBSERVED_OBJECT_ERROR_NOTIFIED = 2; + + /** + * An observed attribute error has been notified. + * REVIEW: Check + */ + protected final int OBSERVED_ATTRIBUTE_ERROR_NOTIFIED = 4; + + /** + * An observed attribute type error has been notified. + * REVIEW: Check + */ + protected final int OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED = 8; + + // Attributes ---------------------------------------------------- + + /** + * The granularity period. + */ + long granularityPeriod = 10000; + + /** + * The observed attribute. + */ + String observedAttribute = null; + + /** + * The observed object. + */ + ObjectName observedObject = null; + + /** + * Whether the service is active. + */ + boolean active = false; + + /** + * The server this service is registered in. + */ + protected MBeanServer server; + + /** + * The errors that have already been notified. + * REVIEW: Check + */ + protected int alreadyNotified = 0; + + /** + * ????. + * REVIEW: Implement + */ + protected String dbgTag = null; + + /** + * The runnable monitor. + */ + private MonitorRunnable monitorRunnable; + + /** + * The notification sequence number. + */ + private long sequenceNumber; + + // Static -------------------------------------------------------- + + /** + * The controlling thread. + */ + static Controller controller = new Controller(); + + /** + * The next run date of the next monitor. + */ + static long nextRunDate; + + /** + * The monitor runnables + */ + static Set monitors = Collections.synchronizedSet(new HashSet()); + + /** + * Add a monitor to the active list + */ + static void add(Monitor monitor) + { + monitors.add(monitor.monitorRunnable); + } + + /** + * Remove a monitor from the active list + */ + static void remove(Monitor monitor) + { + monitors.remove(monitor.monitorRunnable); + } + + /** + * Recalculate the next run for active monitors but not running monitors. + */ + static void reschedule() + { + // Loop through the monitors to find the next run. + synchronized (monitors) + { + nextRunDate = 0; + Iterator iterator = monitors.iterator(); + while (iterator.hasNext()) + { + MonitorRunnable runnable = (MonitorRunnable) iterator.next(); + if (runnable.running == false) + runnable.reschedule(false); + } + } + } + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + // MonitorMBean implementation ----------------------------------- + + public long getGranularityPeriod() + { + return granularityPeriod; + } + + public String getObservedAttribute() + { + return observedAttribute; + } + + public ObjectName getObservedObject() + { + return observedObject; + } + + public boolean isActive() + { + return active; + } + public void setGranularityPeriod(long period) + throws IllegalArgumentException + { + if (period <= 0) + throw new IllegalArgumentException("Period must be positive."); + granularityPeriod = period; + } + + public void setObservedAttribute(String attribute) + { + observedAttribute = attribute; + // REVIEW: not find grained enough? + alreadyNotified = RESET_FLAGS_ALREADY_NOTIFIED; + } + + public void setObservedObject(ObjectName object) + { + observedObject = object; + // REVIEW: not find grained enough? + alreadyNotified = RESET_FLAGS_ALREADY_NOTIFIED; + } + + public synchronized void start() + { + // Ignore if already active + if (active) + return; + active = true; + + // Start the monitor runnable + monitorRunnable = new MonitorRunnable(this); + } + + public synchronized void stop() + { + // Ignore if not active + if (!active) + return; + + // Stop the monitor runnable + active = false; + monitors.remove(monitorRunnable); + } + + // MBeanRegistrationImplementation overrides --------------------- + + public ObjectName preRegister(MBeanServer server, ObjectName objectName) + throws Exception + { + // Remember the server. + this.server = server; + + // Use the passed object name. + return objectName; + } + + public void postRegister(Boolean registrationDone) + { + } + + public void preDeregister() + throws Exception + { + // Stop the monitor before deregistration. + stop(); + } + + public void postDeregister() + { + } + + // Package protected --------------------------------------------- + + /** + * Run the monitor.<p> + * + * Retrieves the monitored attribute and passes it to each service.<p> + * + * Peforms the common error processing. + * REVIEW: Use the internal interface???? + */ + void runMonitor() + { + // Monitor for uncaught errors + try + { + MBeanInfo mbeanInfo = null; + try + { + mbeanInfo = server.getMBeanInfo(observedObject); + } + catch (InstanceNotFoundException e) + { + sendObjectErrorNotification("The observed object is not registered."); + return; + } + + // Get the attribute information + MBeanAttributeInfo[] mbeanAttributeInfo = mbeanInfo.getAttributes(); + MBeanAttributeInfo attributeInfo = null; + for (int i = 0; i < mbeanAttributeInfo.length; i++) + { + if (mbeanAttributeInfo[i].getName().equals(observedAttribute)) + { + attributeInfo = mbeanAttributeInfo[i]; + break; + } + } + + // The attribute must exist + if (attributeInfo == null) + { + sendAttributeErrorNotification( + "The observed attribute does not exist"); + return; + } + // The attribute must exist + if (!attributeInfo.isReadable()) + { + sendAttributeErrorNotification("Attribute not readable."); + return; + } + + // Determine the get method. + String methodName = null; + if (attributeInfo.isIs()) + methodName = "is" + observedAttribute; + else + methodName = "get" + observedAttribute; + + // Get the value + Object value = null; + try + { + value = server.invoke(observedObject, methodName, + new Object[0], new String[0]); + } + catch (InstanceNotFoundException e) + { + sendObjectErrorNotification("The observed object is not registered."); + return; + } + + // Check for null value + if (value == null) + { + sendAttributeTypeErrorNotification("Attribute is null"); + return; + } + + // Now pass the value to the respective monitor. + monitor(attributeInfo, value); + } + // Notify an unexcepted error + catch (Exception e) + { + sendRuntimeErrorNotification("General error: " + e.toString()); + } + } + + /** + * Perform the monitor specific processing. + * + * @param attributeInfo the MBean attribute information. + * @param value the value to monitor. + */ + abstract void monitor(MBeanAttributeInfo attributeInfo, Object value) + throws Exception; + + /** + * Sends the notification + * + * @param type the notification type. + * @param timestamp the time of the notification. + * @param message the human readable message to send. + * @param attribute the attribute name. + * @param gauge the derived gauge. + * @param trigger the trigger value. + */ + void sendNotification(String type, long timestamp, String message, + String attribute, Object gauge, Object trigger) + { + long seq = 0; + synchronized (this) + { + seq = ++sequenceNumber; + } + if (timestamp == 0) + timestamp = System.currentTimeMillis(); + sendNotification(new MonitorNotification(type, this, seq, + timestamp, message, gauge, + attribute, observedObject, trigger)); + } + + /** + * Send a runtime error notification. + * + * @param message the human readable message to send. + */ + void sendRuntimeErrorNotification(String message) + { + if ((alreadyNotified & RUNTIME_ERROR_NOTIFIED) == 0) + sendNotification(MonitorNotification.RUNTIME_ERROR, 0, + message, null, null, null); + alreadyNotified |= RUNTIME_ERROR_NOTIFIED; + } + + /** + * Send an object error notification. + * + * @param message the human readable message to send. + */ + void sendObjectErrorNotification(String message) + { + if ((alreadyNotified & OBSERVED_OBJECT_ERROR_NOTIFIED) == 0) + sendNotification(MonitorNotification.OBSERVED_OBJECT_ERROR, 0, + message, null, null, null); + alreadyNotified |= OBSERVED_OBJECT_ERROR_NOTIFIED; + } + + /** + * Send an attribute error notification. + * + * @param message the human readable message to send. + */ + void sendAttributeErrorNotification(String message) + { + if ((alreadyNotified & OBSERVED_ATTRIBUTE_ERROR_NOTIFIED) == 0) + sendNotification(MonitorNotification.OBSERVED_ATTRIBUTE_ERROR, 0, + message, observedAttribute, null, null); + alreadyNotified |= OBSERVED_ATTRIBUTE_ERROR_NOTIFIED; + } + + /** + * Send an attribute type error notification. + * + * @param message the human readable message to send. + */ + void sendAttributeTypeErrorNotification(String message) + { + if ((alreadyNotified & OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED) == 0) + sendNotification(MonitorNotification.OBSERVED_ATTRIBUTE_TYPE_ERROR, 0, + message, observedAttribute, null, null); + alreadyNotified |= OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED; + } + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + + /** + * A monitor runnable. + */ + private class MonitorRunnable + implements Runnable + { + // Attributes ---------------------------------------------------- + + // The monitoring to perform + private Monitor monitor; + + /** + * The next monitor date. + */ + long nextDate = 0; + + /** + * Is this running + */ + boolean running = false; + + // Constructors -------------------------------------------------- + + /** + * Create a monitor runnable to periodically perform monitoring. + * It will add it to the list of monitors to run. + * + * @param monitor the montioring to perform. + */ + public MonitorRunnable(Monitor monitor) + { + this.monitor = monitor; + monitors.add(this); + reschedule(true); + } + + // Public -------------------------------------------------------- + + /** + * Calculate the next notification date. Add on the period until + * the number of occurences is exhausted. + * + * @return false when there are no more occurences, true otherwise. + */ + void calcNextDate() + { + // Calculate the next occurence + nextDate = System.currentTimeMillis() + monitor.getGranularityPeriod(); + } + + /** + * See whether this monitor is the next one. + * + * @param runnable the monitor runnable to check + * @param notify whether to notify the controller of changes + */ + void reschedule(boolean notify) + { + synchronized (this) + { + if (nextRunDate == 0 || nextDate < nextRunDate) + { + nextRunDate = nextDate; + if (notify == true) + controller.interrupt(); + } + } + } + + // Runnable overrides ------------------------------------------- + + /** + * Run the montior + */ + public void run() + { + // Perform the monitoring + monitor.runMonitor(); + + // Calculate the next date + calcNextDate(); + + // We've finished with this notification + running = false; + + // Work out if this is the next to run + reschedule(true); + } + } + + /** + * The controlling thread. + */ + private static class Controller + extends Thread + { + // Attributes ---------------------------------------------------- + + /** + * The thread pool used to obtain threads to run monitors. + */ + ThreadPool threadPool; + + // Constructors -------------------------------------------------- + + /** + * This controller runs in a different thread as a daemon. + * It has a thread pool to run monitors. + */ + public Controller() + { + super(); + setDaemon(true); + threadPool = new ThreadPool(); + threadPool.setActive(true); + super.start(); + } + + // Public -------------------------------------------------------- + + // Thread Overrides----------------------------------------------- + + /** + * Wait until the next run date or until we are interrupted by + * a reschedule. When active, run the monitor + * to see whether notifications should be sent. + */ + public void run() + { + while (true) + { + synchronized (monitors) + { + Iterator iterator = monitors.iterator(); + while (iterator.hasNext()) + { + MonitorRunnable runnable = (MonitorRunnable) iterator.next(); + + // Run required? + if (runnable.running == false + && runnable.nextDate <= System.currentTimeMillis()) + { + runnable.running = true; + threadPool.run(runnable); + } + } + } + + // Reschedule the waiting monitors + reschedule(); + + // Wait until the next monitor required or we are interrupted + try + { + if (nextRunDate == 0) + sleep(0); + else + { + long waitMillis = nextRunDate - System.currentTimeMillis(); + if (waitMillis > 0) + sleep(waitMillis); + } + } + catch (InterruptedException ignored) {} + } + } + } +}
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development