User: user57
Date: 01/07/20 19:08:02
Modified: src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
Log:
o Using ConnectionFactoryHelper to create connections
o Factored out some of the common Queue & Topic initalization so they can
both use the same code. Note there is still some work that could be done
here to clean things up.
o Documented things a bit more.
Revision Changes Path
1.18 +253 -177
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- JMSContainerInvoker.java 2001/07/09 21:14:28 1.17
+++ JMSContainerInvoker.java 2001/07/21 02:08:01 1.18
@@ -35,6 +35,7 @@
import org.apache.log4j.Category;
+import org.jboss.jms.ConnectionFactoryHelper;
import org.jboss.ejb.MethodInvocation;
import org.jboss.ejb.Container;
import org.jboss.ejb.ContainerInvokerContainer;
@@ -59,7 +60,7 @@
* @author <a href="mailto:[EMAIL PROTECTED]">Sebastien Alborini</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*/
public class JMSContainerInvoker
implements ContainerInvoker, XmlLoadable
@@ -108,20 +109,17 @@
// Public --------------------------------------------------------
- public void setOptimized(boolean optimize)
- {
+ public void setOptimized(final boolean optimize) {
log.debug("Container Invoker optimize set to " + optimize);
this.optimize = optimize;
}
- public boolean isOptimized()
- {
+ public boolean isOptimized() {
log.debug("Optimize in action: " + optimize);
return optimize;
}
- public EJBMetaData getEJBMetaData()
- {
+ public EJBMetaData getEJBMetaData() {
throw new Error("Not valid for MessageDriven beans");
}
@@ -171,7 +169,12 @@
}
// ContainerService implementation -------------------------------
-
+
+ /**
+ * Set the container for which this is an invoker to.
+ *
+ * @param container The container for which this is an invoker to.
+ */
public void setContainer(final Container container)
{
this.container = container;
@@ -180,6 +183,8 @@
/**
* Return the JMSProviderAdapter that should be used.
+ *
+ * @return The JMSProviderAdapter to use.
*/
protected JMSProviderAdapter getJMSProviderAdapter()
throws NamingException
@@ -195,23 +200,160 @@
}
/**
- * Return the ServerSessionPoolFactory that should be used.
+ * Create a new connection from the given factory.
+ *
+ * @param factory An object that implements QueueConnectionFactory,
+ * XAQueueConnectionFactory, TopicConnectionFactory or
+ * XATopicConnectionFactory.
+ * @param username The username to use or null for no user.
+ * @param password The password for the given username or null if no
+ * @return A new connection from the given factory.
+ *
+ * @throws JMSException Failed to create connection.
*/
- protected ServerSessionPoolFactory getServerSessionPoolFactory()
- throws NamingException
+ protected Connection createConnection(final Object factory,
+ final String username,
+ final String password)
+ throws JMSException
+ {
+ log.debug("attempting to create connection from factory: " + factory);
+ connection = ConnectionFactoryHelper.createConnection
+ (factory, username, password);
+ log.debug("created connection: " + connection);
+
+ return connection;
+ }
+
+ /**
+ * Parse the JNDI suffix from the given JNDI name.
+ *
+ * @param jndiname The JNDI name used to lookup the destination.
+ * @param defaultSuffix The default suffix to use if parsing fails.
+ * @return The parsed suffix or the defaultSuffix
+ */
+ protected String parseJndiSuffix(final String jndiname,
+ final String defautSuffix)
+ {
+ // jndiSuffix is merely the name that the user has given the MDB.
+ // since the jndi name contains the message type I have to split
+ // at the "/" if there is no slash then I use the entire jndi name...
+ String jndiSuffix = "";
+ if (jndiname != null) {
+ int indexOfSlash = jndiname.indexOf("/");
+ if (indexOfSlash != -1) {
+ jndiSuffix = jndiname.substring(indexOfSlash+1);
+ } else {
+ jndiSuffix = jndiname;
+ }
+ }
+ else {
+ // if the jndi name from jboss.xml is null then lets use the ejbName
+ jndiSuffix = defautSuffix;
+ }
+
+ return jndiSuffix;
+ }
+
+ /**
+ * Create and or lookup a JMS destination.
+ *
+ * @param type Either javax.jms.Queue or javax.jms.Topic.
+ * @param ctx The naming context to lookup destinations from.
+ * @param jndiName The name to use when looking up destinations.
+ * @param jndiSuffix The name to use when creating destinations.
+ * @return The destination.
+ *
+ * @throws IllegalArgumentException Type is not Queue or Topic.
+ */
+ protected Destination createDestination(final Class type,
+ final Context ctx,
+ final String jndiName,
+ final String jndiSuffix)
+ throws Exception
{
+ try {
+ // first try to look it up
+ return (Destination)ctx.lookup(jndiName);
+ }
+ catch (NamingException e) {
+ // if the lookup failes, the try to create it
+ log.warn("destination not found: " + jndiName, e);
+
+ //
+ // attempt to create the destination (note, this is very
+ // very, very unportable).
+ //
+ MBeanServer server = (MBeanServer)
+ MBeanServerFactory.findMBeanServer(null).iterator().next();
+
+ if (type == Topic.class) {
+ server.invoke(new ObjectName("JMS","service","JMSServer"),
+ "newTopic", new Object[]{ jndiSuffix },
+ new String[] {"java.lang.String"});
+ }
+ else if (type == Queue.class) {
+ server.invoke(new ObjectName("JMS","service","JMSServer"),
+ "newQueue", new Object[]{ jndiSuffix },
+ new String[] {"java.lang.String"});
+ }
+ else {
+ // type was not a Topic or Queue, bad user
+ throw new IllegalArgumentException
+ ("expected javax.jms.Queue or javax.jms.Topic for type: " +
+ type);
+ }
+
+ return (Destination)ctx.lookup(jndiName);
+ }
+ }
+
+ /**
+ * Create a server session pool for the given connection.
+ *
+ * @param connection The connection to use.
+ * @param maxSession The maximum number of sessions.
+ * @param isTransacted True if the sessions are transacted.
+ * @param ack The session acknowledgement mode.
+ * @param listener The message listener.
+ * @return A server session pool.
+ *
+ * @throws JMSException
+ */
+ protected ServerSessionPool
+ createSessionPool(final Connection connection,
+ final int maxSession,
+ final boolean isTransacted,
+ final int ack,
+ final MessageListener listener)
+ throws NamingException, JMSException
+ {
+ ServerSessionPool pool;
Context context = new InitialContext();
+
try {
+ // first lookup the factory
log.debug("looking up session pool factory: " +
serverSessionPoolFactoryJNDI);
- return (ServerSessionPoolFactory)
+ ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
context.lookup(serverSessionPoolFactoryJNDI);
+
+ // the create the pool
+ pool = factory.getServerSessionPool
+ (connection, maxSession, isTransacted, ack, listener);
}
finally {
context.close();
}
+
+ return pool;
}
+ /**
+ * Initialize the container invoker. Sets up a connection, a server
+ * session pool and a connection consumer for the configured destination.
+ *
+ * @throws Exception Failed to initalize.
+ */
public void init() throws Exception
{
log.debug("initializing");
@@ -229,121 +371,74 @@
// Queue or Topic
String destinationType = config.getDestinationType();
- // Is containermanages TX
+ // Is containermanages TX (not used?)
boolean isContainerManagedTx = config.isContainerManagedTx();
-
acknowledgeMode = config.getAcknowledgeMode();
// Get configuration data from jboss.xml
String destinationJNDI = config.getDestinationJndiName();
String user = config.getUser();
-
- // Set upp JNDI
- // connect to the JNDI server and get a reference to root context
- Context context = null;
-
+ String password = config.getPasswd();
+
+ // Get the JMS provider
JMSProviderAdapter adapter = getJMSProviderAdapter();
log.debug("provider adapter: " + adapter);
-
- context = adapter.getInitialContext();
+
+ // Connect to the JNDI server and get a reference to root context
+ Context context = adapter.getInitialContext();
log.debug("context: " + context);
// if we can't get the root context then exit with an exception
- if (context == null)
- {
+ if (context == null) {
throw new RuntimeException("Failed to get the root context");
}
-
- // Set up pool
- ServerSessionPoolFactory poolFactory = getServerSessionPoolFactory();
- // jndiSuffix is merely the name that the user has given the MDB.
- // since the jndi name contains the message type I have to split
- // at the "/" if there is no slash then I use the entire jndi name...
- String jndiSuffix = "";
- if (destinationJNDI != null) {
- int indexOfSlash = destinationJNDI.indexOf("/");
- if (indexOfSlash != -1) {
- jndiSuffix = destinationJNDI.substring(indexOfSlash+1);
- } else {
- jndiSuffix = destinationJNDI;
- }
- // if the jndi name from jboss.xml is null then lets use the ejbName
- } else {
- jndiSuffix = config.getEjbName();
- }
+ // Get the JNDI suffix of the destination
+ String jndiSuffix = parseJndiSuffix(destinationJNDI,
+ config.getEjbName());
log.debug("jndiSuffix: " + jndiSuffix);
- MBeanServer server = (MBeanServer)
- MBeanServerFactory.findMBeanServer(null).iterator().next();
-
if (destinationType.equals("javax.jms.Topic"))
{
log.debug("Got destination type Topic for " + config.getEjbName());
-
- // All classes are different between topics and queues!!
- TopicConnectionFactory topicFactory =
- (TopicConnectionFactory)context.
- lookup(adapter.getTopicFactoryRef());
-
- // Do we have a user - this is messy code (should be done for queues to)
- TopicConnection topicConnection;
- if(user != null)
- {
- log.debug("Creating topic connection with user: " +
- user + " passwd: " + config.getPasswd());
- topicConnection = topicFactory.
- createTopicConnection(user, config.getPasswd());
- }
- else
- {
- topicConnection = topicFactory.createTopicConnection();
- }
-
- // Lookup destination
- // First Try a lookup.
- // If that lookup fails then try to contact the MBeanServer and inoke a
new...
- // Then do lookup again..
- String topicJndi = "topic/"+jndiSuffix;
- Topic topic;
- try {
- topic = (Topic)context.lookup(topicJndi);
- } catch(NamingException ne) {
- log.error("JndiName not found:"+topicJndi +
- "...attempting to recover", ne);
- server.invoke(new ObjectName("JMS","service","JMSServer"),
- "newTopic", new Object[]{jndiSuffix},
- new String[] {"java.lang.String"});
- topic = (Topic)context.lookup(topicJndi);
- }
-
- pool = poolFactory.getServerSessionPool
- (topicConnection,
- maxPoolSize,
- //Transacted
- true,
- acknowledgeMode,
- new MessageListenerImpl(this));
+
+ // create a topic connection
+ Object factory = context.lookup(adapter.getTopicFactoryRef());
+ TopicConnection tConnection =
+ (TopicConnection)createConnection(factory, user, password);
+
+ // lookup or create the destination topic
+ Topic topic =
+ (Topic)createDestination(Topic.class,
+ context,
+ "topic/" + jndiSuffix,
+ jndiSuffix);
+
+ // set up the server session pool
+ pool = createSessionPool(tConnection,
+ maxPoolSize,
+ true, // tx
+ acknowledgeMode,
+ new MessageListenerImpl(this));
// To be no-durable or durable
if (config.getSubscriptionDurability() !=
MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
{
// Create non durable
- connectionConsumer = topicConnection.
+ connectionConsumer = tConnection.
createConnectionConsumer(topic,
messageSelector,
pool,
maxMessagesNr);
}
- else
- {
+ else {
//Durable subscription
String clientId = config.getClientId();
- String durableName = clientId != null ? clientId:
-
- config.getEjbName();
- connectionConsumer = topicConnection.
+ String durableName =
+ clientId != null ? clientId: config.getEjbName();
+
+ connectionConsumer = tConnection.
createDurableConnectionConsumer(topic,
durableName,
messageSelector,
@@ -351,71 +446,45 @@
maxMessagesNr);
}
- // set global connection, so we have something to
- // start() and close()
- connection = topicConnection;
log.debug("Topic connectionConsumer set up");
-
}
else if (destinationType.equals("javax.jms.Queue"))
{
- log.debug("Got destination type Queue");
- QueueConnectionFactory queueFactory =
- (QueueConnectionFactory)context.lookup(adapter.getQueueFactoryRef());
-
- // Do we have a user
- QueueConnection queueConnection;
- if (user != null)
- {
- queueConnection = queueFactory.
- createQueueConnection(user, config.getPasswd());
- }
- else
- {
- queueConnection = queueFactory.createQueueConnection();
- }
-
- // Lookup destination
- // First Try a lookup.
- // If that lookup fails then try to contact the MBeanServer and inoke a
new...
- // Then do lookup again..
- String queueJndi = "queue/"+jndiSuffix;
- Queue queue;
- try {
- queue = (Queue)context.lookup(queueJndi);
- } catch(NamingException ne) {
- log.error("JndiName not found:"+queueJndi +
- "...attempting to recover", ne);
- server.invoke(new ObjectName("JMS:service=JMSServer"),
- "newQueue", new Object[]{jndiSuffix},
- new String[] {"java.lang.String"});
- queue = (Queue)context.lookup(queueJndi);
- }
-
- pool = poolFactory.
- getServerSessionPool(queueConnection,
- maxPoolSize,
- //Transacted
- true,
- acknowledgeMode,
- new MessageListenerImpl(this));
+ log.debug("Got destination type Queue for " + config.getEjbName());
+
+ // create a queue connection
+ Object qFactory = context.lookup(adapter.getQueueFactoryRef());
+ QueueConnection qConnection =
+ (QueueConnection)createConnection(qFactory, user, password);
+
+ // lookup or create the destination queue
+ Queue queue =
+ (Queue)createDestination(Queue.class,
+ context,
+ "queue/" + jndiSuffix,
+ jndiSuffix);
+
+ // set up the server session pool
+ pool = createSessionPool(qConnection,
+ maxPoolSize,
+ true, // tx
+ acknowledgeMode,
+ new MessageListenerImpl(this));
- connectionConsumer = queueConnection.
+ connectionConsumer = qConnection.
createConnectionConsumer(queue,
messageSelector,
pool,
maxMessagesNr);
- // set global connection, so we have something to
- // start() and close()
- connection = queueConnection;
log.debug("Queue connectionConsumer set up");
}
}
- // Start the connection
- public void start()
- throws Exception
+ /**
+ * Start the connection.
+ */
+ public void start() throws Exception
{
log.debug("Starting JMSContainerInvoker");
exListener = new ExceptionListenerImpl(this);
@@ -423,7 +492,9 @@
connection.start();
}
- // Stop the connection
+ /**
+ * Stop the connection.
+ */
public void stop()
{
log.debug("Stopping JMSContainerInvoker");
@@ -432,7 +503,6 @@
exListener.stop();
}
innerStop();
-
}
/**
@@ -443,6 +513,7 @@
try {
if (connection != null) {
connection.setExceptionListener(null);
+ log.debug("unset exception listener");
}
} catch (Exception e) {
log.error("Could not set ExceptionListener to null", e);
@@ -452,17 +523,21 @@
try {
if (connection != null) {
connection.stop();
+ log.debug("connection stopped");
}
} catch (Exception e) {
log.error("Could not stop JMS connection", e);
}
}
-
- // Take down all fixtures
+ /**
+ * Take down all fixtures.
+ */
public void destroy()
{
log.debug("Destroying JMSContainerInvoker");
+
+ // clear the server session pool (if it is clearable)
try {
if (pool instanceof StdServerSessionPool) {
StdServerSessionPool p = (StdServerSessionPool)pool;
@@ -471,26 +546,29 @@
} catch (Exception e) {
log.error("Could not clear ServerSessionPool", e);
}
-
+
+ // close the connection consumer
try {
if (connectionConsumer != null) {
connectionConsumer.close();
}
- } catch(Exception e) {
+ } catch (Exception e) {
log.error("Could not close consumer", e);
}
-
- try {
- if (connection != null) {
+
+ // close the connection
+ if (connection != null) {
+ try {
connection.close();
+ } catch (Exception e) {
+ log.error("Could not close connection", e);
}
- } catch(Exception e) {
- log.error("Could not close connection", e);
}
}
-
- // XmlLoadable implementation
+ /**
+ * XmlLoadable implementation
+ */
public void importXml(Element element) throws DeploymentException
{
try {
@@ -501,9 +579,9 @@
String maxSize = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MaximumSize"));
maxPoolSize = Integer.parseInt(maxSize);
- } catch(NumberFormatException e) {
+ } catch (NumberFormatException e) {
//Noop will take default value
- } catch(DeploymentException e) {
+ } catch (DeploymentException e) {
//Noop will take default value
}
@@ -522,7 +600,6 @@
serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -530,7 +607,11 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
+
+ /**
+ * An implementation of MessageListener that passes messages on
+ * to the container invoker.
+ */
class MessageListenerImpl
implements MessageListener
{
@@ -555,19 +636,14 @@
// Invoke, shuld we catch any Exceptions??
try {
- invoker.invoke(// Object id - where used?
- id,
- // Method to invoke
- ON_MESSAGE,
- //argument
- new Object[] {message},
- //Transaction
- tm.getTransaction(),
- //Principal
- null,
- //Cred
- null);
- } catch(Exception e) {
+ invoker.invoke(id, // Object id - where used?
+ ON_MESSAGE, // Method to invoke
+ new Object[] {message}, // argument
+ tm.getTransaction(), // Transaction
+ null, // Principal
+ null); // Cred
+ }
+ catch (Exception e) {
log.error("Exception in JMSCI message listener", e);
}
}
@@ -619,7 +695,7 @@
tryIt = false;
log.info("OK - reconnected");
}
- catch(Exception e) {
+ catch (Exception e) {
log.error("MDB error reconnecting", e);
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development