User: pra
Date: 00/12/06 05:00:58
Added: src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
Log:
Added MessageDriven bean classes in ejb hierarchy. ContainerInvoker is changed to
support MessageDriven Beans
Revision Changes Path
1.1
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
/*
* jBoss, the OpenSource EJB server
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins.jms;
import java.util.Collection;
import java.util.Hashtable;
import java.lang.reflect.Method;
import java.security.Principal;
import javax.jms.*;
import javax.ejb.EJBMetaData;
import javax.ejb.EJBHome;
import javax.ejb.EJBObject;
import javax.naming.Name;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.NameNotFoundException;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.ejb.MethodInvocation;
import org.jboss.ejb.Container;
import org.jboss.ejb.ContainerInvokerContainer;
import org.jboss.ejb.Interceptor;
import org.jboss.ejb.ContainerInvoker;
import org.jboss.ejb.DeploymentException;
import org.jboss.tm.TxManager;
import org.jboss.logging.Logger;
import org.jboss.metadata.XmlLoadable;
import org.jboss.metadata.MetaData;
import org.jboss.metadata.MessageDrivenMetaData;
import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.jms.asf.ServerSessionPoolFactory;
import org.exolab.jms.client.JmsServerSessionPool;
import org.w3c.dom.Element;
//import org.exolab.jms.jndi.JndiConstants;
/**
* ContainerInvoker for JMS MessageDrivenBeans, based on JRMPContainerInvoker.
* <description>
*
* @see <related>
* @author Peter Antman ([EMAIL PROTECTED])
* @author Rickard �berg ([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Sebastien
Alborini</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
* @version $Revision: 1.1 $
*/
public class JMSContainerInvoker implements
ContainerInvoker, XmlLoadable
{
// Constants -----------------------------------------------------
static final String msgInterface = "javax.jms.MessageListener";
static final String msgMethod = "onMessage";
static final String msgArgument = "javax.jms.Message";
static Method listenerMethod;
static {
// Get the method
try {
Class msgInterfaceClass = Class.forName(msgInterface);
Class argumentClass = Class.forName(msgArgument);
listenerMethod = msgInterfaceClass.getMethod(msgMethod, new Class[]
{argumentClass});
} catch(ClassNotFoundException ex) {
Logger.error("Could not the classes for message interface" + msgInterface
+ ": " + ex);
}catch(NoSuchMethodException ex) {
Logger.error("Could not get the method for message interface" + msgMethod
+ ": " + ex);
}
};
// Attributes ----------------------------------------------------
protected boolean optimize = false;
protected int maxMessagesNr = 1;
protected int maxPoolSize = 15;
protected String jMSProviderAdapterJNDI;
protected String serverSessionPoolFactoryJNDI;
protected int acknowledgeMode;
protected Container container;
protected Connection connection;
protected ConnectionConsumer connectionConsumer;
protected TxManager tm;
/*
protected String jndiName;
protected EJBMetaDataImpl ejbMetaData;
*/
/*
// The home can be one.
protected EJBHome home;
// The Stateless Object can be one.
protected EJBObject statelessObject;
protected HashMap beanMethodInvokerMap;
protected HashMap homeMethodInvokerMap;
*/
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
public void setOptimized(boolean optimize)
{
this.optimize = optimize;
//DEBUG Logger.debug("Container Invoker optimize set to
'"+optimize+"'");
}
public boolean isOptimized()
{
//DEBUG Logger.debug("Optimize in action: '"+optimize+"'");
return optimize;
}
public EJBMetaData getEJBMetaData()
{
throw new Error("Not valid for MessageDriven beans");
}
// ContainerInvoker implementation
public EJBHome getEJBHome() {throw new Error("Not valid for MessageDriven
beans");}
public EJBObject getStatelessSessionEJBObject() {throw new Error("Not valid for
MessageDriven beans");}
public EJBObject getStatefulSessionEJBObject(Object id) {throw new Error("Not
valid for MessageDriven beans");}
public EJBObject getEntityEJBObject(Object id) {throw new Error("Not valid for
MessageDriven beans");}
public Collection getEntityCollection(Collection ids) {throw new Error("Not
valid for MessageDriven beans");}
public Object invoke(Object id, Method m, Object[] args, Transaction tx,
Principal identity, Object credential )
throws Exception
{
MethodInvocation mi = new MethodInvocation(id, m, args, tx, identity,
credential);
if (acknowledgeMode == MessageDrivenMetaData.CLIENT_ACKNOWLEDGE_MODE) {
// CMT with required
// Create tx
tm.begin();
Transaction txNew = tm.getTransaction();
//DEBUG Logger.debug("CMT with required, getting transaction
"+txNew);
mi.setTransaction(txNew);
}
// Set the right context classloader
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(container.getClassLoader());
try
{
return container.invoke(mi);
} finally {
/*This is a test, to see if we may place tx logic here
for message ack. This is probably architecuraly NOT the
right place to do it. But the alternatives are wore:
1. Either it should be done further down the container chains.
This makes the impl more tied to JMS, wich is not good.
2. Or it should be made in the Listener, wich does not have acces
to the transaction.
*/
if (acknowledgeMode == MessageDrivenMetaData.CLIENT_ACKNOWLEDGE_MODE) {
//DEBUG Logger.debug("Transaction: " + mi.getTransaction());
// Handle CMT with required
try {
Transaction t = mi.getTransaction();
if (t !=null && t.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
if (args[0] != null) {
// actually roll it back
t.rollback();
// How dangerus is this ;-)
Message msg = (Message)args[0];
msg.acknowledge();
}
} else {
t.commit();
}
}catch(Exception ex) {
Logger.error("JMSContainerInvoker: Error in acking CMT Required " +
ex);
}
}
Thread.currentThread().setContextClassLoader(oldCl);
}
}
// ContainerService implementation -------------------------------
public void setContainer(Container con)
{
this.container = con;
//jndiName = container.getBeanMetaData().getJndiName();
}
public void init()
throws Exception
{
// Set transaction manager - should this be done for msg to??
// GenericProxy.setTransactionManager(container.getTransactionManager());
//String destinationJNDI = "test2";
// Store TM reference locally - should we test for CMT Required
tm = (TxManager) container.getTransactionManager();
/*
* Get configuration information - from EJB-xml
*/
MessageDrivenMetaData config =
((MessageDrivenMetaData)container.getBeanMetaData());
// Selector
String messageSelector = config.getMessageSelector();
// Queue or Topic
String destinationType = config.getDestinationType();
// Is containermanages TX
boolean isContainerManagedTx = config.isContainerManagedTx();
acknowledgeMode = config.getAcknowledgeMode();
// Get configuration data from jboss.xml
String destinationJNDI = config.getDestinationJndiName();
/*
* Set upp JNDI
* connect to the JNDI server and get a reference to
* root context
*/
Context context = null;
Context jbossContext = new InitialContext();
JMSProviderAdapter adapter =
(JMSProviderAdapter)jbossContext.lookup(jMSProviderAdapterJNDI);
context = adapter.getInitialContext();
// if we can't get the root context then exit with an exception
if (context == null)
{
throw new RuntimeException("Failed to get the root context");
}
// Set up pool
// FIXME - this way we don't know how to set acknowlede mode and
// transaction mode
ServerSessionPoolFactory poolFactory =
(ServerSessionPoolFactory)jbossContext.lookup(serverSessionPoolFactoryJNDI);
if (destinationType.equals("javax.jms.Topic")) {
Logger.debug("Got destination type Topic");
// All classes are different between topics and queues!!
TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(adapter.getTopicFactoryName());
TopicConnection topicConnection = topicFactory.createTopicConnection();
// Test to set an identity
//topicConnection.setClientID(((MessageDrivenMetaData)container.getBeanMetaData()).getEjbName());
// Lookup destination
Topic topic = (Topic)context.lookup(destinationJNDI);
ServerSessionPool pool =
poolFactory.getServerSessionPool(topicConnection,maxPoolSize, false, acknowledgeMode,
new MessageListenerImpl(this));
// To be no-durable or durable
if (config.getSubscriptionDurability() !=
MessageDrivenMetaData.DURABLE_SUBSCRIPTION) {
// Create non durable
connectionConsumer = topicConnection.createConnectionConsumer(topic,
messageSelector, pool, maxMessagesNr);
} else {
// Create durable - FIXME durable name!!
connectionConsumer =
topicConnection.createDurableConnectionConsumer(topic, messageSelector, pool,
maxMessagesNr);
}
// set global connection, so we have something to start() and close()
connection = topicConnection;
Logger.debug("Topic connectionConsumer set up");
}else if(destinationType.equals("javax.jms.Queue")) {
Logger.debug("Got destination type Queue");
QueueConnectionFactory queueFactory =
(QueueConnectionFactory)context.lookup(adapter.getQueueFactoryName());
QueueConnection queueConnection = queueFactory.createQueueConnection();
// Test to set an identity
//topicConnection.setClientID(((MessageDrivenMetaData)container.getBeanMetaData()).getEjbName());
// Lookup destination
Queue queue = (Queue)context.lookup(destinationJNDI);
ServerSessionPool pool =
poolFactory.getServerSessionPool(queueConnection,maxPoolSize, false, acknowledgeMode,
new MessageListenerImpl(this));
connectionConsumer = queueConnection.createConnectionConsumer(queue,
messageSelector, pool, maxMessagesNr);
// set global connection, so we have something to start() and close()
connection = queueConnection;
Logger.debug("Queue connectionConsumer set up");
}
}
public void start()
throws Exception
{
Logger.debug("Starting JMSContainerInvoker");
connection.start();
}
// What are the differences between stop and destroy?
public void stop()
{
Logger.debug("Stopping JMSContainerInvoker");
try {
connectionConsumer.close();
connection.close();
}catch(JMSException ex) {
Logger.log("Could not stop JMSContainerInvoker:" + ex);
}
}
public void destroy()
{
Logger.debug("Destroying JMSContainerInvoker");
}
// XmlLoadable implementation
public void importXml(Element element) throws DeploymentException {
//String opt = MetaData.getElementContent(MetaData.getUniqueChild(elem
//Load needed metadata here
try {
String maxMessages =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaxMessages"));
maxMessagesNr = Integer.parseInt(maxMessages);
String maxSize =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaximumSize"));
maxPoolSize = Integer.parseInt(maxSize);
} catch(NumberFormatException e) {
//Noop will take default value
} catch(DeploymentException e) {
//Noop will take default value
}
// If these are not found we will get a DeploymentException, I hope
jMSProviderAdapterJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
serverSessionPoolFactoryJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element,
"ServerSessionPoolFactoryJNDI"));
// Check java:/ prefix
if (!jMSProviderAdapterJNDI.startsWith("java:/"))
jMSProviderAdapterJNDI = "java:/"+jMSProviderAdapterJNDI;
if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
class MessageListenerImpl implements MessageListener {
JMSContainerInvoker invoker = null;
MessageListenerImpl(JMSContainerInvoker invoker) {
this.invoker = invoker;
}
public void onMessage(Message message)
{
Logger.debug(
"[" + Thread.currentThread().hashCode() +
"] Processing message " + message);
Object id;
try {
id = message.getJMSMessageID();
}catch(javax.jms.JMSException ex) {
id = "JMSContainerInvoke";
}
// Invoke, shuld we catch any Exceptions??
try {
invoker.invoke(
// Object id - where used?
id,
// Method to invoke
listenerMethod,
//argument
new Object[] {message},
//Transaction - where do we get these
null,
//Principal
null,
//Cred
null);
}catch(Exception ex) {
Logger.log("Exception in CI message listener: panick what to do???: "
+ ex);
ex.printStackTrace();
}
//try {
// Should be handled up the chain handled either in invoke
// finally with transaction, or in ASF stuff othe ack modes
//message.acknowledge();
//}catch(JMSException ex) {
// Logger.log("Exception in CI message listener: could not acknowledge
the message: " + ex);
//}
}
}
}