Author: asankha
Date: Fri Apr 25 01:07:36 2008
New Revision: 16145
Log:
fix issues with maintenence mode/graceful shutdown and core JMX control
Modified:
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml
Modified:
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java
==============================================================================
---
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java
(original)
+++
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java
Fri Apr 25 01:07:36 2008
@@ -21,11 +21,17 @@
import org.apache.axiom.om.xpath.AXIOMXPath;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.AxisFault;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.synapse.ServerManager;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.transport.base.ManagementSupport;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.registry.Registry;
@@ -90,6 +96,8 @@
/** Save the TCCL of the initial thread that starts the ESB for future
use. When JMX calls are
* received via RMI connections, re-start etc may otherwise fail due to
classloading issues */
private ClassLoader classLoader = null;
+ /** Is the service bus manager (i.e. this) started? */
+ private boolean serviceBusManagerStarted = false;
private ServiceBusManager() {}
@@ -102,15 +110,17 @@
/**
* Perform one time configuration for the JVM such as Log4J
initialization, and core
- * JMX Setup of the ServerView
+ * JMX Setup of the ServerView MBean
*/
public void init() {
if (alreadyInitialized) {
return;
}
+ // save web app classloader
classLoader = Thread.currentThread().getContextClassLoader();
+ // initialize log4j
URL log4jURL = null;
try {
log4jURL = new URL("file:" + sbc.getEsbHome() + File.separator
@@ -123,7 +133,9 @@
// register a JVM shutdown hook
Thread shutdownHook = new Thread() {
public void run() {
- shutdown();
+ if (serviceBusManagerStarted) {
+ shutdown();
+ }
}
};
shutdownHook.setName("WSO2 ESB Shutdown Hook");
@@ -131,16 +143,26 @@
HibernateConfigCache.clearCache();
+ // start JMX
startJMXService();
registerMBean(
ManagementFactory.getPlatformMBeanServer(), new
ServiceBusControl(),
sbc.getJmxAgentName() + ":Type=Admin,ConnectorName=ServerAdmin");
+ // ask Synapse transports to use the ESB agent name
+ System.setProperty(ServiceBusConstants.JMX_AGENT_NAME,
sbc.getJmxAgentName());
alreadyInitialized = true;
+ serviceBusManagerStarted = true;
}
+ /**
+ * Start the WSO2 ESB. This method can be invoked multiple times within a
JVM to re-start
+ * the ESB after an invocation of the stop() method. However, a shutdown()
would terminate
+ * the core ServerViewMBean and will prevent any further start()
invocations over JMX
+ */
public void start() {
if (!alreadyInitialized) {
+ // if one time (per JVM) initialization has not yet taken place,
do it..
init();
}
@@ -161,24 +183,223 @@
// generate UI pages and start JMX
generateUIPages();
registerMBeans();
+ log.info("[ESB] Start request completed");
}
+ /**
+ * Stops the ESB, and allow it to be re-started via the start() method
subsequently. This
+ * will stop the Synapse engine (and transports) and also unregister all
JMX MBeans, leaving
+ * only the core ServerView MBean that could be used to restart the ESB
+ * @throws ServiceBusException
+ */
public void stop() throws ServiceBusException {
log.info("Stopping the WSO2 Enterprise Service Bus..");
stopStatisticsReporter();
stopDataBaseServer();
ServerManager.getInstance().stop();
unregisterMBeans();
+ log.info("[ESB] Stop request completed");
}
+ /**
+ * Stop the ESB and the ServerView MBean and JMX control, and shutdown the
JVM
+ */
public void shutdown() {
stop();
stopJMXService();
log.info("[ESB] Shutdown completed");
+ serviceBusManagerStarted = false;
+
+ // wait a few seconds more just to be really sure..
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException ignore) {}
+
+ // if this was invoked by the shutdown hook, we should not call
System.exit
+ if (!Thread.currentThread().getName().endsWith("Hook")) {
+ System.exit(0);
+ }
}
- public void shutdownGracefully(long millis) {
- // TODO
+ /**
+ * Graceful shutdown of the ESB
+ *
+ * Put all transports that can go into maintenence mode into maintenence
+ * wait for all transport listener thread pools to reach zero
+ *
+ * @param waitMillis the maximum number of ms to wait until a graceful
shutdown is achieved,
+ * before forcing a shutdown
+ */
+ public void shutdownGracefully(long waitMillis) {
+
+ long endTime = System.currentTimeMillis() + waitMillis;
+ log.info("Requesting a graceful shutdown at : " + new Date() + " in a
maximum of : " +
+ (waitMillis/1000) + " seconds");
+
+ // put transports into maintenence
+ startMaintenance();
+
+ // wait till listeners are idle
+ boolean safeToShutdown = false;
+
+ while (!safeToShutdown) {
+
+ int pendingThreads = 0;
+ Map<String, TransportInDescription> trpIns =
+ configurationContext.getAxisConfiguration().getTransportsIn();
+ Map<String, TransportOutDescription> trpOuts =
+ configurationContext.getAxisConfiguration().getTransportsOut();
+
+ for (TransportInDescription trpIn : trpIns.values()) {
+ TransportListener trpLst = trpIn.getReceiver();
+
+ if (trpLst instanceof ManagementSupport) {
+ int inUse = ((ManagementSupport)
trpLst).getActiveThreadCount();
+ int inQue = ((ManagementSupport) trpLst).getQueueSize();
+
+ if (inUse+inQue > 0) {
+ log.info("Transport Listsner : " + trpIn.getName() +
+ " currently using : " + inUse + " threads with " +
+ inQue + " requests already queued..");
+
+ pendingThreads = (inUse + inQue);
+ }
+ }
+ }
+
+ for (TransportOutDescription trpOut : trpOuts.values()) {
+ TransportSender trpSnd = trpOut.getSender();
+
+ if (trpSnd instanceof ManagementSupport) {
+ int inUse = ((ManagementSupport)
trpSnd).getActiveThreadCount();
+ int inQue = ((ManagementSupport) trpSnd).getQueueSize();
+
+ if (inUse+inQue > 0) {
+ log.info("Transport Sender : " + trpOut.getName() +
+ " currently using : " + inUse + " threads with " +
+ inQue + " requests already queued..");
+
+ pendingThreads += (inUse + inQue);
+ }
+ }
+ }
+
+ int pendingCallbacks =
ServerManager.getInstance().pendingCallbacks();
+ if (pendingCallbacks > 0) {
+ log.info("Waiting for : " + pendingCallbacks + "
callbacks/replies..");
+ }
+
+ // safe to shutdown is when used listner threads and callbacks are
all zero
+ safeToShutdown = (pendingThreads + pendingCallbacks == 0);
+
+ if (safeToShutdown) {
+ log.info("All transport threads are idle and no pending
callbacks..");
+
+ } else {
+ if (System.currentTimeMillis() > endTime) {
+ log.info("The instance could not be gracefully shutdown in
: " +
+ (waitMillis/1000) + " seconds. Shutdown immidiate..");
+ safeToShutdown = true;
+ stop();
+
+ } else {
+ log.info("Waiting for a maximum of another " +
+ (endTime - System.currentTimeMillis())/1000 +
+ " seconds until transport threads to become idle, and
callbacks complete..");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignore) {}
+ }
+ }
+ }
+ log.info("Graceful shutdown request completed in " +
+ (System.currentTimeMillis() - (endTime-waitMillis))/1000 + "
seconds");
+ }
+
+ /**
+ * Put transport listsners and senders into maintenence mode
+ */
+ public void startMaintenance() {
+ log.info("Putting transport listners and senders into maintenence
mode..");
+
+ // put listers into maintenence
+ Map<String, TransportInDescription> trpIns =
+ configurationContext.getAxisConfiguration().getTransportsIn();
+
+ for (TransportInDescription trpIn : trpIns.values()) {
+ TransportListener trpLst = trpIn.getReceiver();
+ if (trpLst instanceof ManagementSupport) {
+ try {
+ ((ManagementSupport) trpLst).pause();
+ } catch (AxisFault axisFault) {
+ log.error("Error putting transport listener for : "
+ + trpIn.getName() + " into maintenence");
+ }
+ }
+ }
+
+ // put senders into maintenence
+ Map<String, TransportOutDescription> trpOuts =
+ configurationContext.getAxisConfiguration().getTransportsOut();
+ for (TransportOutDescription trpOut : trpOuts.values()) {
+ TransportSender trpSnd = trpOut.getSender();
+ if (trpSnd instanceof ManagementSupport) {
+ try {
+ ((ManagementSupport) trpSnd).pause();
+ } catch (AxisFault axisFault) {
+ log.error("Error putting transport sender for : "
+ + trpOut.getName() + " into maintenence");
+ }
+ }
+ }
+
+ // put tasks on hold
+ // TODO $$$$$$$$$$$$$$$$4
+
+ log.info("[ESB] Entered maintenence mode");
+ }
+
+ /**
+ * Resume normal operations for transports already entered into
maintenence mode
+ */
+ public void endMaintenance() {
+ log.info("Resuming transport listners and senders from maintenence
mode..");
+
+ // put listers into maintenence
+ Map<String, TransportInDescription> trpIns =
+ configurationContext.getAxisConfiguration().getTransportsIn();
+
+ for (TransportInDescription trpIn : trpIns.values()) {
+ TransportListener trpLst = trpIn.getReceiver();
+ if (trpLst instanceof ManagementSupport) {
+ try {
+ ((ManagementSupport) trpLst).resume();
+ } catch (AxisFault axisFault) {
+ log.error("Error putting transport listener for : "
+ + trpIn.getName() + " into maintenence");
+ }
+ }
+ }
+
+ // put senders into maintenence
+ Map<String, TransportOutDescription> trpOuts =
+ configurationContext.getAxisConfiguration().getTransportsOut();
+ for (TransportOutDescription trpOut : trpOuts.values()) {
+ TransportSender trpSnd = trpOut.getSender();
+ if (trpSnd instanceof ManagementSupport) {
+ try {
+ ((ManagementSupport) trpSnd).resume();
+ } catch (AxisFault axisFault) {
+ log.error("Error putting transport sender for : "
+ + trpOut.getName() + " into maintenence");
+ }
+ }
+ }
+
+ // Resume tasks
+ // TODO $$$$$$$$$$$$$$$$4
+
+ log.info("[ESB] Resumed normal operations from maintenence mode");
}
/**
@@ -370,7 +591,7 @@
* Stop the statistics processing thread
*/
private void stopStatisticsReporter() {
- if (statisticsReporterThread != null) {
+ if (statisticsReporterThread != null &&
statisticsReporterThread.isAlive()) {
statisticsReporterThread.shutdown();
statisticsReporterThread.interrupt();
}
Modified:
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
==============================================================================
---
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
(original)
+++
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
Fri Apr 25 01:07:36 2008
@@ -50,15 +50,18 @@
sbm.shutdown();
}
- public void shutdownGracefully(long millis) throws Exception {
+ public void shutdownGracefully(long secs) throws Exception {
Thread.currentThread().setContextClassLoader(sbm.getClassLoader());
+ sbm.shutdownGracefully(secs * 1000);
}
public void startMaintenance() throws Exception {
Thread.currentThread().setContextClassLoader(sbm.getClassLoader());
+ sbm.startMaintenance();
}
public void endMaintenance() throws Exception {
Thread.currentThread().setContextClassLoader(sbm.getClassLoader());
+ sbm.endMaintenance();
}
}
Modified: trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml
==============================================================================
--- trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml
(original)
+++ trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml
Fri Apr 25 01:07:36 2008
@@ -151,7 +151,7 @@
<parameter name="non-blocking" locked="false">true</parameter>
<parameter name="keystore" locked="false">
<KeyStore>
- <Location>identity.jks</Location>
+
<Location>../../webapp/WEB-INF/classes/conf/identity.jks</Location>
<Type>JKS</Type>
<Password>password</Password>
<KeyPassword>password</KeyPassword>
@@ -159,7 +159,7 @@
</parameter>
<parameter name="truststore" locked="false">
<TrustStore>
- <Location>trust.jks</Location>
+
<Location>../../webapp/WEB-INF/classes/conf/trust.jks</Location>
<Type>JKS</Type>
<Password>password</Password>
</TrustStore>
@@ -211,7 +211,7 @@
<parameter name="non-blocking" locked="false">true</parameter>
<parameter name="keystore" locked="false">
<KeyStore>
- <Location>identity.jks</Location>
+
<Location>../../webapp/WEB-INF/classes/conf/identity.jks</Location>
<Type>JKS</Type>
<Password>password</Password>
<KeyPassword>password</KeyPassword>
@@ -219,7 +219,7 @@
</parameter>
<parameter name="truststore" locked="false">
<TrustStore>
- <Location>trust.jks</Location>
+
<Location>../../webapp/WEB-INF/classes/conf/trust.jks</Location>
<Type>JKS</Type>
<Password>password</Password>
</TrustStore>
_______________________________________________
Esb-java-dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev