User: starksm
Date: 01/07/08 14:19:29
Modified: src/main/org/jboss/ejb/plugins/jms Tag: Branch_2_4
JMSContainerInvoker.java
Log:
Merge the latest JBossMQ changes into the 2.4 release
Revision Changes Path
No revision
No revision
1.12.4.1 +560 -572
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
retrieving revision 1.12
retrieving revision 1.12.4.1
diff -u -r1.12 -r1.12.4.1
--- JMSContainerInvoker.java 2001/06/01 06:06:37 1.12
+++ JMSContainerInvoker.java 2001/07/08 21:19:29 1.12.4.1
@@ -1,572 +1,560 @@
-/*
- * jBoss, the OpenSource EJB server
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.ejb.plugins.jms;
-
-import java.util.Collection;
-import java.util.Hashtable;
-import java.lang.reflect.Method;
-import java.security.Principal;
-
-import javax.jms.*;
-
-import javax.ejb.EJBMetaData;
-import javax.ejb.EJBHome;
-import javax.ejb.EJBObject;
-
-import javax.naming.Name;
-import javax.naming.InitialContext;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.NameNotFoundException;
-
-import javax.transaction.Status;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
-
-import org.jboss.ejb.MethodInvocation;
-import org.jboss.ejb.Container;
-import org.jboss.ejb.ContainerInvokerContainer;
-import org.jboss.ejb.Interceptor;
-import org.jboss.ejb.ContainerInvoker;
-import org.jboss.ejb.DeploymentException;
-
-import org.jboss.tm.TxManager;
-
-import org.jboss.logging.Logger;
-import org.jboss.metadata.XmlLoadable;
-import org.jboss.metadata.MetaData;
-import org.jboss.metadata.MessageDrivenMetaData;
-
-import org.jboss.jms.jndi.JMSProviderAdapter;
-import org.jboss.jms.asf.ServerSessionPoolFactory;
-
-import org.w3c.dom.Element;
-
-import javax.management.MBeanServerFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-/**
- * ContainerInvoker for JMS MessageDrivenBeans, based on JRMPContainerInvoker.
- * <description>
- *
- * @see <related>
- * @author Peter Antman ([EMAIL PROTECTED])
- * @author Rickard Öberg ([EMAIL PROTECTED])
- * @author <a href="mailto:[EMAIL PROTECTED]">Sebastien Alborini</a>
- * @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
- * @version $Revision: 1.12 $
- */
-public class JMSContainerInvoker implements
-ContainerInvoker, XmlLoadable
-{
- // Constants -----------------------------------------------------
- static final String msgInterface = "javax.jms.MessageListener";
- static final String msgMethod = "onMessage";
- static final String msgArgument = "javax.jms.Message";
- static Method listenerMethod;
- static {
- // Get the method
- try {
- Class msgInterfaceClass = Class.forName(msgInterface);
- Class argumentClass = Class.forName(msgArgument);
- listenerMethod = msgInterfaceClass.getMethod(msgMethod, new Class[]
{argumentClass});
-
- } catch(ClassNotFoundException ex) {
- Logger.error("Could not the classes for message interface" + msgInterface
+ ": " + ex);
- }catch(NoSuchMethodException ex) {
- Logger.error("Could not get the method for message interface" + msgMethod
+ ": " + ex);
- }
- };
-
- // Attributes ----------------------------------------------------
- protected boolean optimize = false;
- protected int maxMessagesNr = 1;
- protected int maxPoolSize = 15;
- protected String jMSProviderAdapterJNDI;
- protected String serverSessionPoolFactoryJNDI;
- protected int acknowledgeMode;
-
- protected Container container;
-
- protected Connection connection;
- protected ConnectionConsumer connectionConsumer;
- protected TxManager tm;
- protected ServerSessionPool pool;
- protected ExceptionListenerImpl exListener;
-
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
- public void setOptimized(boolean optimize)
- {
- this.optimize = optimize;
- //DEBUG Logger.debug("Container Invoker optimize set to
'"+optimize+"'");
- }
-
- public boolean isOptimized()
- {
- //DEBUG Logger.debug("Optimize in action: '"+optimize+"'");
- return optimize;
- }
-
- public EJBMetaData getEJBMetaData()
- {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- // ContainerInvoker implementation
- public EJBHome getEJBHome() {throw new Error("Not valid for MessageDriven
beans");}
-
- public EJBObject getStatelessSessionEJBObject() {throw new Error("Not valid for
MessageDriven beans");}
-
- public EJBObject getStatefulSessionEJBObject(Object id) {throw new Error("Not
valid for MessageDriven beans");}
-
- public EJBObject getEntityEJBObject(Object id) {throw new Error("Not valid for
MessageDriven beans");}
-
- public Collection getEntityCollection(Collection ids) {throw new Error("Not
valid for MessageDriven beans");}
-
-
- public Object invoke(Object id, Method m, Object[] args, Transaction tx,
- Principal identity, Object credential )
- throws Exception
- {
-
- MethodInvocation mi = new MethodInvocation(id, m, args, tx, identity,
credential);
-
- // Set the right context classloader
- ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(container.getClassLoader());
-
-
- try
- {
- return container.invoke(mi);
- } finally {
- Thread.currentThread().setContextClassLoader(oldCl);
- }
- }
-
- // ContainerService implementation -------------------------------
- public void setContainer(Container con)
- {
- this.container = con;
- //jndiName = container.getBeanMetaData().getJndiName();
- }
-
- public void init()
- throws Exception
- {
-
- // Store TM reference locally - should we test for CMT Required
- tm = (TxManager) container.getTransactionManager();
-
- /*
- * Get configuration information - from EJB-xml
- */
- MessageDrivenMetaData config =
((MessageDrivenMetaData)container.getBeanMetaData());
-
- // Selector
- String messageSelector = config.getMessageSelector();
- // Queue or Topic
- String destinationType = config.getDestinationType();
-
- // Is containermanages TX
- boolean isContainerManagedTx = config.isContainerManagedTx();
-
- acknowledgeMode = config.getAcknowledgeMode();
-
- // Get configuration data from jboss.xml
- String destinationJNDI = config.getDestinationJndiName();
- String user = config.getUser();
-
-
- /*
- * Set upp JNDI
- * connect to the JNDI server and get a reference to
- * root context
- */
- Context context = null;
-
- Context jbossContext = new InitialContext();
- JMSProviderAdapter adapter =
(JMSProviderAdapter)jbossContext.lookup(jMSProviderAdapterJNDI);
- context = adapter.getInitialContext();
-
-
- // if we can't get the root context then exit with an exception
- if (context == null)
- {
- throw new RuntimeException("Failed to get the root context");
- }
-
- // Set up pool
- ServerSessionPoolFactory poolFactory =
(ServerSessionPoolFactory)jbossContext.lookup(serverSessionPoolFactoryJNDI);
-
- // jndiSuffix is merely the name that the user has given the MDB.
- // since the jndi name contains the message type I have to split at the "/"
- // if there is no slash then I use the entire jndi name.....
- String jndiSuffix = "";
- if(destinationJNDI != null){
- int indexOfSlash = destinationJNDI.indexOf("/");
- if(indexOfSlash != -1){
- jndiSuffix = destinationJNDI.substring(indexOfSlash+1);
- }else{
- jndiSuffix = destinationJNDI;
- }
-
- // if the jndi name from jboss.xml is null then lets use the ejbName
- }else{
- jndiSuffix = config.getEjbName();
- }
- MBeanServer server =
(MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next();
-
- if (destinationType.equals("javax.jms.Topic"))
- {
- Logger.debug("Got destination type Topic for " + config.getEjbName());
-
- // All classes are different between topics and queues!!
- TopicConnectionFactory topicFactory =
- (TopicConnectionFactory)context.
- lookup(adapter.getTopicFactoryName());
- // Do we have a user - this is messy code (should be done for queues to)
- TopicConnection topicConnection;
- if(user != null)
- {
- Logger.debug("Creating topic connection with user: " +
- user + " passwd: " + config.getPasswd());
- topicConnection = topicFactory.
- createTopicConnection(user, config.getPasswd());
- }
- else
- {
- topicConnection = topicFactory.createTopicConnection();
- }
-
- // Lookup destination
- // First Try a lookup.
- // If that lookup fails then try to contact the MBeanServer and inoke a
new...
- // Then do lookup again..
- String topicJndi = "topic/"+jndiSuffix;
- Topic topic;
- try{
- topic = (Topic)context.lookup(topicJndi);
- }catch(NamingException ne){
- Logger.log("JndiName not found:"+topicJndi + "...attempting
to recover");
- server.invoke(new ObjectName("JMS","service","JMSServer"),
"newTopic", new Object[]{jndiSuffix}, new String[] {"java.lang.String"});
- topic = (Topic)context.lookup(topicJndi);
- }
-
- pool = poolFactory.
- getServerSessionPool(
- topicConnection,
- maxPoolSize,
- //Transacted
- true,
- acknowledgeMode,
- new MessageListenerImpl(this));
-
- // To be no-durable or durable
- if (config.getSubscriptionDurability() !=
- MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
- {
- // Create non durable
- connectionConsumer = topicConnection.
- createConnectionConsumer(
- topic,
- messageSelector,
- pool,
- maxMessagesNr);
- }
- else
- {
- //Durable subscription
- String clientId = config.getClientId();
- String durableName = clientId != null ?
- clientId:
- config.getEjbName();
- connectionConsumer = topicConnection.
- createDurableConnectionConsumer(
- topic,
- durableName,
- messageSelector,
- pool,
- maxMessagesNr);
- }
- // set global connection, so we have something to start() and close()
- connection = topicConnection;
- Logger.debug("Topic connectionConsumer set up");
-
- }
- else if(destinationType.equals("javax.jms.Queue"))
- {
- Logger.debug("Got destination type Queue");
- QueueConnectionFactory queueFactory =
-
(QueueConnectionFactory)context.lookup(adapter.getQueueFactoryName());
-
- // Do we have a user
- QueueConnection queueConnection;
- if (user != null)
- {
- queueConnection = queueFactory.
- createQueueConnection(
- user,
- config.getPasswd());
- }
- else
- {
- queueConnection = queueFactory.createQueueConnection();
- }
-
- // Lookup destination
- // First Try a lookup.
- // If that lookup fails then try to contact the MBeanServer and inoke a
new...
- // Then do lookup again..
- String queueJndi = "queue/"+jndiSuffix;
- Queue queue;
- try
- {
- queue = (Queue)context.lookup(queueJndi);
- }
- catch(NamingException ne){
- Logger.log("JndiName not found:"+queueJndi + "...attempting to
recover");
- server.invoke(new ObjectName("JMS:service=JMSServer"), "newQueue",
new Object[]{jndiSuffix}, new String[] {"java.lang.String"});
- queue = (Queue)context.lookup(queueJndi);
- }
-
- pool = poolFactory.
- getServerSessionPool(
- queueConnection,
- maxPoolSize,
- //Transacted
- true,
- acknowledgeMode,
- new MessageListenerImpl(this));
-
-
- connectionConsumer = queueConnection.
- createConnectionConsumer(
- queue,
- messageSelector,
- pool,
- maxMessagesNr);
-
- // set global connection, so we have something to start() and close()
- connection = queueConnection;
- Logger.debug("Queue connectionConsumer set up");
- }
- }
-
- // Start the connection
- public void start()
- throws Exception
- {
- Logger.debug("Starting JMSContainerInvoker");
- exListener = new ExceptionListenerImpl(this);
- connection.setExceptionListener(exListener);
- connection.start();
- }
-
- // Stop the connection
- public void stop()
- {
- Logger.debug("Stopping JMSContainerInvoker");
- // Silence the exception listener
- if (exListener != null)
- exListener.stop();
- innerStop();
-
- }
-
- // Stop done from inside, we should not stop the exceptionListener in
- // inner stop
- protected void innerStop() {
- try {
- if (connection != null)
- connection.setExceptionListener(null);
-
- }catch(Exception se) {
- Logger.log("Could not set JMSContainerInvoker ExceptionListener to null :"
+ se);
- }
-
- // Stop the connection
- try {
- if(connection != null)
- connection.stop();
- }catch(Exception cs) {
- Logger.log("Could not stop JMS connection:" + cs);
- }
- }
-
-
- // Take down all fixtures
- public void destroy()
- {
- Logger.debug("Destroying JMSContainerInvoker");
- try {
- if (pool instanceof org.jboss.jms.asf.StdServerSessionPool) {
- org.jboss.jms.asf.StdServerSessionPool p =
- (org.jboss.jms.asf.StdServerSessionPool)pool;
- p.clear();
- }
- }catch(Exception pe) {
- Logger.log("Could not clear ServerSessionPool:" + pe);
- }
-
-
- try {
- if (connectionConsumer != null)
- connectionConsumer.close();
- }catch(Exception ex) {
- Logger.log("Could not close JMSContainerInvoker consumer:" + ex);
- }
- try {
- if (connection != null)
- connection.close();
- }catch(Exception ex) {
- Logger.log("Could not close JMSContainerInvoker connection:" + ex);
- }
- }
-
- // XmlLoadable implementation
- public void importXml(Element element) throws DeploymentException
- {
-
- try {
- String maxMessages =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaxMessages"));
- maxMessagesNr = Integer.parseInt(maxMessages);
- String maxSize =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaximumSize"));
- maxPoolSize = Integer.parseInt(maxSize);
- } catch(NumberFormatException e) {
- //Noop will take default value
- } catch(DeploymentException e) {
- //Noop will take default value
- }
-
- // If these are not found we will get a DeploymentException, I hope
- jMSProviderAdapterJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
- serverSessionPoolFactoryJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element,
"ServerSessionPoolFactoryJNDI"));
-
- // Check java:/ prefix
- if (!jMSProviderAdapterJNDI.startsWith("java:/"))
- jMSProviderAdapterJNDI = "java:/"+jMSProviderAdapterJNDI;
- if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
- serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
- class MessageListenerImpl implements MessageListener {
-
-
- JMSContainerInvoker invoker = null;
-
-
- MessageListenerImpl(JMSContainerInvoker invoker) {
- this.invoker = invoker;
- }
- public void onMessage(Message message)
- {
- /*
- Logger.debug(
- "[" + Thread.currentThread().hashCode() +
- "] Processing message " + message);
- */
- Object id;
- try {
- id = message.getJMSMessageID();
- }catch(javax.jms.JMSException ex) {
- id = "JMSContainerInvoke";
- }
- // Invoke, shuld we catch any Exceptions??
- try {
- invoker.invoke(
- // Object id - where used?
- id,
- // Method to invoke
- listenerMethod,
- //argument
- new Object[] {message},
- //Transaction
- tm.getTransaction(),
- //Principal
- null,
- //Cred
- null);
- }catch(Exception ex) {
- Logger.log("Exception in JMSCI message listener: : " + ex);
- ex.printStackTrace();
- }
-
- }
-
- }
-
- /**
- * ExceptionListener for failover handling.
- */
- class ExceptionListenerImpl implements ExceptionListener {
- JMSContainerInvoker invoker = null;
- Thread currentThread = null;
- boolean notStoped = true;
-
- ExceptionListenerImpl(JMSContainerInvoker invoker) {
- this.invoker = invoker;
- }
-
- void stop() {
- notStoped = false;
- if ( currentThread != null) currentThread.interrupt();
- }
-
- public void onException(JMSException ex) {
- currentThread = Thread.currentThread();
- try {
- Logger.warning("MDB lost connection to provider");
- boolean tryIt = true;
- while(tryIt && notStoped) {
- Logger.log("MDB Trying to reconnect...");
- try {
- try {
- Thread.sleep(10000);
- }catch(InterruptedException ie) { tryIt=false; return;}
- //try {
- // Reboot container
- invoker.innerStop();
- invoker.destroy();
- invoker.init();
- invoker.start();
- tryIt = false;
- Logger.log("OK - reconnected");
- //return;
- }catch(Exception e) {
- Logger.log("MDB error reconnecting: " +e);
- }
- }
-
-
- //topicConnection.close();
- }catch(Exception je) {
- Logger.warning("Could not restart connection " + je);
- je.printStackTrace();
- } finally {
- currentThread = null;
- }
- }
-
- }
-}
+/*
+ * jBoss, the OpenSource EJB server
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.ejb.plugins.jms;
+
+import java.util.Collection;
+import java.util.Hashtable;
+import java.lang.reflect.Method;
+import java.security.Principal;
+
+import javax.jms.*;
+
+import javax.ejb.EJBMetaData;
+import javax.ejb.EJBHome;
+import javax.ejb.EJBObject;
+
+import javax.naming.Name;
+import javax.naming.InitialContext;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.NameNotFoundException;
+
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+
+import org.jboss.ejb.MethodInvocation;
+import org.jboss.ejb.Container;
+import org.jboss.ejb.ContainerInvokerContainer;
+import org.jboss.ejb.Interceptor;
+import org.jboss.ejb.ContainerInvoker;
+import org.jboss.ejb.DeploymentException;
+
+import org.jboss.logging.Logger;
+import org.jboss.metadata.XmlLoadable;
+import org.jboss.metadata.MetaData;
+import org.jboss.metadata.MessageDrivenMetaData;
+
+import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.jms.asf.ServerSessionPoolFactory;
+
+import org.w3c.dom.Element;
+
+import javax.management.MBeanServerFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+/**
+ * ContainerInvoker for JMS MessageDrivenBeans, based on JRMPContainerInvoker.
+ * <description>
+ *
+ * @see <related>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
+ * @author <a href="mailto:[EMAIL PROTECTED]">Rickard Öberg</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Sebastien Alborini</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
+ * @version $Revision: 1.12.4.1 $
+ */
+public class JMSContainerInvoker implements
+ContainerInvoker, XmlLoadable
+{
+ // Constants -----------------------------------------------------
+ static final String msgInterface = "javax.jms.MessageListener";
+ static final String msgMethod = "onMessage";
+ static final String msgArgument = "javax.jms.Message";
+ static Method listenerMethod;
+ static {
+ // Get the method
+ try {
+ Class msgInterfaceClass = Class.forName(msgInterface);
+ Class argumentClass = Class.forName(msgArgument);
+ listenerMethod = msgInterfaceClass.getMethod(msgMethod, new Class[]
{argumentClass});
+
+ } catch(ClassNotFoundException ex) {
+ Logger.error("Could not the classes for message interface" + msgInterface
+ ": " + ex);
+ }catch(NoSuchMethodException ex) {
+ Logger.error("Could not get the method for message interface" + msgMethod
+ ": " + ex);
+ }
+ }
+
+ // Attributes ----------------------------------------------------
+ protected boolean optimize = false;
+ protected int maxMessagesNr = 1;
+ protected int maxPoolSize = 15;
+ protected String jMSProviderAdapterJNDI;
+ protected String serverSessionPoolFactoryJNDI;
+ protected int acknowledgeMode;
+
+ protected Container container;
+
+ protected Connection connection;
+ protected ConnectionConsumer connectionConsumer;
+ protected TransactionManager tm;
+ protected ServerSessionPool pool;
+ protected ExceptionListenerImpl exListener;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void setOptimized(boolean optimize)
+ {
+ this.optimize = optimize;
+ //DEBUG Logger.debug("Container Invoker optimize set to
'"+optimize+"'");
+ }
+
+ public boolean isOptimized()
+ {
+ //DEBUG Logger.debug("Optimize in action: '"+optimize+"'");
+ return optimize;
+ }
+
+ public EJBMetaData getEJBMetaData()
+ {
+ throw new Error("Not valid for MessageDriven beans");
+ }
+
+ // ContainerInvoker implementation
+ public EJBHome getEJBHome() {throw new Error("Not valid for MessageDriven
beans");}
+
+ public EJBObject getStatelessSessionEJBObject() {throw new Error("Not valid for
MessageDriven beans");}
+
+ public EJBObject getStatefulSessionEJBObject(Object id) {throw new Error("Not
valid for MessageDriven beans");}
+
+ public EJBObject getEntityEJBObject(Object id) {throw new Error("Not valid for
MessageDriven beans");}
+
+ public Collection getEntityCollection(Collection ids) {throw new Error("Not
valid for MessageDriven beans");}
+
+
+ public Object invoke(Object id, Method m, Object[] args, Transaction tx,
+ Principal identity, Object credential )
+ throws Exception
+ {
+
+ MethodInvocation mi = new MethodInvocation(id, m, args, tx, identity,
credential);
+
+ // Set the right context classloader
+ ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(container.getClassLoader());
+
+
+ try
+ {
+ return container.invoke(mi);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldCl);
+ }
+ }
+
+ // ContainerService implementation -------------------------------
+ public void setContainer(Container con)
+ {
+ this.container = con;
+ //jndiName = container.getBeanMetaData().getJndiName();
+ }
+
+ public void init()
+ throws Exception
+ {
+
+ // Store TM reference locally - should we test for CMT Required
+ tm = container.getTransactionManager();
+
+ /*
+ * Get configuration information - from EJB-xml
+ */
+ MessageDrivenMetaData config =
((MessageDrivenMetaData)container.getBeanMetaData());
+
+ // Selector
+ String messageSelector = config.getMessageSelector();
+ // Queue or Topic
+ String destinationType = config.getDestinationType();
+
+ // Is containermanages TX
+ boolean isContainerManagedTx = config.isContainerManagedTx();
+
+ acknowledgeMode = config.getAcknowledgeMode();
+
+ // Get configuration data from jboss.xml
+ String destinationJNDI = config.getDestinationJndiName();
+ String user = config.getUser();
+
+
+ /*
+ * Set upp JNDI
+ * connect to the JNDI server and get a reference to
+ * root context
+ */
+ Context context = null;
+
+ Context jbossContext = new InitialContext();
+ JMSProviderAdapter adapter =
(JMSProviderAdapter)jbossContext.lookup(jMSProviderAdapterJNDI);
+ context = adapter.getInitialContext();
+
+
+ // if we can't get the root context then exit with an exception
+ if (context == null)
+ {
+ throw new RuntimeException("Failed to get the root context");
+ }
+
+ // Set up pool
+ ServerSessionPoolFactory poolFactory =
(ServerSessionPoolFactory)jbossContext.lookup(serverSessionPoolFactoryJNDI);
+
+ // jndiSuffix is merely the name that the user has given the MDB.
+ // since the jndi name contains the message type I have to split at the "/"
+ // if there is no slash then I use the entire jndi name.....
+ String jndiSuffix = "";
+ if(destinationJNDI != null){
+ int indexOfSlash = destinationJNDI.indexOf("/");
+ if(indexOfSlash != -1){
+ jndiSuffix = destinationJNDI.substring(indexOfSlash+1);
+ }else{
+ jndiSuffix = destinationJNDI;
+ }
+
+ // if the jndi name from jboss.xml is null then lets use the ejbName
+ }else{
+ jndiSuffix = config.getEjbName();
+ }
+ MBeanServer server =
(MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next();
+
+ if (destinationType.equals("javax.jms.Topic"))
+ {
+ Logger.debug("Got destination type Topic for " + config.getEjbName());
+
+ // All classes are different between topics and queues!!
+ TopicConnectionFactory topicFactory =
+ (TopicConnectionFactory)context.
+ lookup(adapter.getTopicFactoryRef());
+ // Do we have a user - this is messy code (should be done for queues to)
+ TopicConnection topicConnection;
+ if(user != null)
+ {
+ Logger.debug("Creating topic connection with user: " +
+ user + " passwd: " + config.getPasswd());
+ topicConnection = topicFactory.
+ createTopicConnection(user, config.getPasswd());
+ }
+ else
+ {
+ topicConnection = topicFactory.createTopicConnection();
+ }
+
+ // Lookup destination
+ // First Try a lookup.
+ // If that lookup fails then try to contact the MBeanServer and inoke a
new...
+ // Then do lookup again..
+ String topicJndi = "topic/"+jndiSuffix;
+ Topic topic;
+ try{
+ topic = (Topic)context.lookup(topicJndi);
+ }catch(NamingException ne){
+ Logger.log("JndiName not found:"+topicJndi + "...attempting
to recover");
+ server.invoke(new ObjectName("JMS","service","JMSServer"),
"newTopic", new Object[]{jndiSuffix}, new String[] {"java.lang.String"});
+ topic = (Topic)context.lookup(topicJndi);
+ }
+
+ pool = poolFactory.
+ getServerSessionPool(
+ topicConnection,
+ maxPoolSize,
+ //Transacted
+ true,
+ acknowledgeMode,
+ new MessageListenerImpl(this));
+
+ // To be no-durable or durable
+ if (config.getSubscriptionDurability() !=
+ MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
+ {
+ // Create non durable
+ connectionConsumer = topicConnection.
+ createConnectionConsumer(
+ topic,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+ }
+ else
+ {
+ //Durable subscription
+ String clientId = config.getClientId();
+ String durableName = clientId != null ?
+ clientId:
+ config.getEjbName();
+ connectionConsumer = topicConnection.
+ createDurableConnectionConsumer(
+ topic,
+ durableName,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+ }
+ // set global connection, so we have something to start() and close()
+ connection = topicConnection;
+ Logger.debug("Topic connectionConsumer set up");
+
+ }
+ else if(destinationType.equals("javax.jms.Queue"))
+ {
+ Logger.debug("Got destination type Queue");
+ QueueConnectionFactory queueFactory =
+
(QueueConnectionFactory)context.lookup(adapter.getQueueFactoryRef());
+
+ // Do we have a user
+ QueueConnection queueConnection;
+ if (user != null)
+ {
+ queueConnection = queueFactory.
+ createQueueConnection(
+ user,
+ config.getPasswd());
+ }
+ else
+ {
+ queueConnection = queueFactory.createQueueConnection();
+ }
+
+ // Lookup destination
+ // First Try a lookup.
+ // If that lookup fails then try to contact the MBeanServer and inoke a
new...
+ // Then do lookup again..
+ String queueJndi = "queue/"+jndiSuffix;
+ Queue queue;
+ try
+ {
+ queue = (Queue)context.lookup(queueJndi);
+ }
+ catch(NamingException ne){
+ Logger.log("JndiName not found:"+queueJndi + "...attempting to
recover");
+ server.invoke(new ObjectName("JMS:service=JMSServer"), "newQueue",
new Object[]{jndiSuffix}, new String[] {"java.lang.String"});
+ queue = (Queue)context.lookup(queueJndi);
+ }
+
+ pool = poolFactory.
+ getServerSessionPool(
+ queueConnection,
+ maxPoolSize,
+ //Transacted
+ true,
+ acknowledgeMode,
+ new MessageListenerImpl(this));
+
+
+ connectionConsumer = queueConnection.
+ createConnectionConsumer(
+ queue,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+
+ // set global connection, so we have something to start() and close()
+ connection = queueConnection;
+ Logger.debug("Queue connectionConsumer set up");
+ }
+ }
+
+ // Start the connection
+ public void start()
+ throws Exception
+ {
+ Logger.debug("Starting JMSContainerInvoker");
+ exListener = new ExceptionListenerImpl(this);
+ connection.setExceptionListener(exListener);
+ connection.start();
+ }
+
+ // Stop the connection
+ public void stop()
+ {
+ Logger.debug("Stopping JMSContainerInvoker");
+ // Silence the exception listener
+ if (exListener != null)
+ exListener.stop();
+ innerStop();
+
+ }
+
+ // Stop done from inside, we should not stop the exceptionListener in
+ // inner stop
+ protected void innerStop() {
+ try {
+ if (connection != null)
+ connection.setExceptionListener(null);
+
+ }catch(Exception se) {
+ Logger.log("Could not set JMSContainerInvoker ExceptionListener to null :"
+ se);
+ }
+
+ // Stop the connection
+ try {
+ if(connection != null)
+ connection.stop();
+ }catch(Exception cs) {
+ Logger.log("Could not stop JMS connection:" + cs);
+ }
+ }
+
+
+ // Take down all fixtures
+ public void destroy()
+ {
+ Logger.debug("Destroying JMSContainerInvoker");
+ try {
+ if (pool instanceof org.jboss.jms.asf.StdServerSessionPool) {
+ org.jboss.jms.asf.StdServerSessionPool p =
+ (org.jboss.jms.asf.StdServerSessionPool)pool;
+ p.clear();
+ }
+ }catch(Exception pe) {
+ Logger.log("Could not clear ServerSessionPool:" + pe);
+ }
+
+
+ try {
+ if (connectionConsumer != null)
+ connectionConsumer.close();
+ }catch(Exception ex) {
+ Logger.log("Could not close JMSContainerInvoker consumer:" + ex);
+ }
+ try {
+ if (connection != null)
+ connection.close();
+ }catch(Exception ex) {
+ Logger.log("Could not close JMSContainerInvoker connection:" + ex);
+ }
+ }
+
+ // XmlLoadable implementation
+ public void importXml(Element element) throws DeploymentException
+ {
+
+ try {
+ String maxMessages =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaxMessages"));
+ maxMessagesNr = Integer.parseInt(maxMessages);
+ String maxSize =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaximumSize"));
+ maxPoolSize = Integer.parseInt(maxSize);
+ } catch(NumberFormatException e) {
+ //Noop will take default value
+ } catch(DeploymentException e) {
+ //Noop will take default value
+ }
+
+ // If these are not found we will get a DeploymentException, I hope
+ jMSProviderAdapterJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
+ serverSessionPoolFactoryJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element,
"ServerSessionPoolFactoryJNDI"));
+
+ // Check java:/ prefix
+ if (!jMSProviderAdapterJNDI.startsWith("java:/"))
+ jMSProviderAdapterJNDI = "java:/"+jMSProviderAdapterJNDI;
+ if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
+ serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ class MessageListenerImpl implements MessageListener {
+
+
+ JMSContainerInvoker invoker = null;
+
+
+ MessageListenerImpl(JMSContainerInvoker invoker) {
+ this.invoker = invoker;
+ }
+ public void onMessage(Message message)
+ {
+ /*
+ Logger.debug(
+ "[" + Thread.currentThread().hashCode() +
+ "] Processing message " + message);
+ */
+ Object id;
+ try {
+ id = message.getJMSMessageID();
+ }catch(javax.jms.JMSException ex) {
+ id = "JMSContainerInvoke";
+ }
+ // Invoke, shuld we catch any Exceptions??
+ try {
+ invoker.invoke(
+ // Object id - where used?
+ id,
+ // Method to invoke
+ listenerMethod,
+ //argument
+ new Object[] {message},
+ //Transaction
+ tm.getTransaction(),
+ //Principal
+ null,
+ //Cred
+ null);
+ }catch(Exception ex) {
+ Logger.log("Exception in JMSCI message listener: : " + ex);
+ ex.printStackTrace();
+ }
+
+ }
+
+ }
+
+ /**
+ * ExceptionListener for failover handling.
+ */
+ class ExceptionListenerImpl implements ExceptionListener {
+ JMSContainerInvoker invoker = null;
+ Thread currentThread = null;
+ boolean notStoped = true;
+
+ ExceptionListenerImpl(JMSContainerInvoker invoker) {
+ this.invoker = invoker;
+ }
+
+ void stop() {
+ notStoped = false;
+ if ( currentThread != null) currentThread.interrupt();
+ }
+
+ public void onException(JMSException ex) {
+ currentThread = Thread.currentThread();
+
+ Logger.warning("MDB lost connection to provider");
+ boolean tryIt = true;
+ while(tryIt && notStoped) {
+ Logger.log("MDB Trying to reconnect...");
+ try {
+ try {
+ Thread.sleep(10000);
+ }catch(InterruptedException ie) { tryIt=false; return;}
+ // Reboot container
+ invoker.innerStop();
+ invoker.destroy();
+ invoker.init();
+ invoker.start();
+ tryIt = false;
+ Logger.log("OK - reconnected");
+ }catch(Exception e) {
+ Logger.log("MDB error reconnecting: " +e);
+ }
+ }
+ currentThread = null;
+ }
+
+ }
+}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development