User: pra
Date: 01/03/02 06:57:57
Modified: src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
Log:
Added support for failover; if a connection to the JMS server is lost, the container
will try to reconnect in an endless loop until the connection commes back, bean is
undeployed or the server is shut down.
Revision Changes Path
1.8 +97 -5
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
RCS file:
/products/cvs/ejboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- JMSContainerInvoker.java 2001/02/28 09:25:42 1.7
+++ JMSContainerInvoker.java 2001/03/02 14:57:57 1.8
@@ -59,7 +59,7 @@
* @author Rickard �berg ([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Sebastien
Alborini</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class JMSContainerInvoker implements
ContainerInvoker, XmlLoadable
@@ -96,6 +96,8 @@
protected Connection connection;
protected ConnectionConsumer connectionConsumer;
protected TxManager tm;
+ protected ServerSessionPool pool;
+ protected ExceptionListenerImpl exListener;
// Static --------------------------------------------------------
@@ -233,7 +235,7 @@
// Lookup destination
Topic topic = (Topic)context.lookup(destinationJNDI);
- ServerSessionPool pool = poolFactory.
+ pool = poolFactory.
getServerSessionPool(
topicConnection,
maxPoolSize,
@@ -297,7 +299,7 @@
// Lookup destination
Queue queue = (Queue)context.lookup(destinationJNDI);
- ServerSessionPool pool = poolFactory.
+ pool = poolFactory.
getServerSessionPool(
queueConnection,
maxPoolSize,
@@ -318,6 +320,8 @@
connection = queueConnection;
Logger.debug("Queue connectionConsumer set up");
}
+ exListener = new ExceptionListenerImpl(this);
+ connection.setExceptionListener(exListener);
}
public void start()
@@ -331,13 +335,49 @@
public void stop()
{
Logger.debug("Stopping JMSContainerInvoker");
+ if (exListener != null)
+ exListener.stop();
+ innerStop();
+ }
+
+ protected void innerStop() {
+
+ try {
+ if (connection != null)
+ connection.setExceptionListener(null);
+
+ }catch(Exception se) {
+ Logger.log("Could not set JMSContainerInvoker ExceptionListener to null :"
+ se);
+ }
+
try {
+ if(connection == null)
+ connection.stop();
+ }catch(Exception cs) {
+ Logger.log("Could not stop JMSContainerInvoker consumer:" + cs);
+ }
+ try {
+ if (pool instanceof org.jboss.jms.asf.StdServerSessionPool) {
+ org.jboss.jms.asf.StdServerSessionPool p =
+ (org.jboss.jms.asf.StdServerSessionPool)pool;
+ p.clear();
+ }
+ }catch(Exception pe) {
+ Logger.log("Could not cleat ServerSessionPool:" + pe);
+ }
+
+
+ try {
if (connectionConsumer != null)
connectionConsumer.close();
+ }catch(Exception ex) {
+ Logger.log("Could not close JMSContainerInvoker consumer:" + ex);
+ }
+ try {
if (connection != null)
connection.close();
- }catch(JMSException ex) {
- Logger.log("Could not stop JMSContainerInvoker:" + ex);
+ }catch(Exception ex) {
+ Logger.log("Could not close JMSContainerInvoker connection:" + ex);
}
}
@@ -423,6 +463,58 @@
ex.printStackTrace();
}
+ }
+
+ }
+
+ /**
+ * ExceptionListener for failover handling.
+ */
+ class ExceptionListenerImpl implements ExceptionListener {
+ JMSContainerInvoker invoker = null;
+ Thread currentThread = null;
+ boolean notStoped = true;
+
+ ExceptionListenerImpl(JMSContainerInvoker invoker) {
+ this.invoker = invoker;
+ }
+
+ void stop() {
+ notStoped = false;
+ if ( currentThread != null) currentThread.interrupt();
+ }
+
+ public void onException(JMSException ex) {
+ currentThread = Thread.currentThread();
+ try {
+ Logger.warning("MDB lost connection to provider");
+ boolean tryIt = true;
+ while(tryIt && notStoped) {
+ Logger.log("MDB Trying to reconnect...");
+ try {
+ try {
+ Thread.sleep(10000);
+ }catch(InterruptedException ie) { tryIt=false; return;}
+ //try {
+ invoker.innerStop();
+ invoker.init();
+ invoker.start();
+ tryIt = false;
+ Logger.log("OK - reconnected");
+ //return;
+ }catch(Exception e) {
+ Logger.log("MDB error reconnecting: " +e);
+ }
+ }
+
+
+ //topicConnection.close();
+ }catch(Exception je) {
+ Logger.warning("Could not restart connection " + je);
+ je.printStackTrace();
+ } finally {
+ currentThread = null;
+ }
}
}