User: pra
Date: 01/02/28 01:25:42
Modified: src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
Log:
MDB deployment descriptor now follows the latest EJB2.0 DTD; code clean up done;
message receipt now allways transacted
Revision Changes Path
1.7 +170 -205
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
RCS file:
/products/cvs/ejboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- JMSContainerInvoker.java 2001/01/08 21:57:08 1.6
+++ JMSContainerInvoker.java 2001/02/28 09:25:42 1.7
@@ -59,7 +59,7 @@
* @author Rickard �berg ([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Sebastien
Alborini</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class JMSContainerInvoker implements
ContainerInvoker, XmlLoadable
@@ -97,20 +97,6 @@
protected ConnectionConsumer connectionConsumer;
protected TxManager tm;
- /*
- protected String jndiName;
-
- protected EJBMetaDataImpl ejbMetaData;
- */
- /*
- // The home can be one.
- protected EJBHome home;
- // The Stateless Object can be one.
- protected EJBObject statelessObject;
-
- protected HashMap beanMethodInvokerMap;
- protected HashMap homeMethodInvokerMap;
- */
// Static --------------------------------------------------------
@@ -120,12 +106,12 @@
public void setOptimized(boolean optimize)
{
this.optimize = optimize;
-//DEBUG Logger.debug("Container Invoker optimize set to
'"+optimize+"'");
+ //DEBUG Logger.debug("Container Invoker optimize set to
'"+optimize+"'");
}
public boolean isOptimized()
{
-//DEBUG Logger.debug("Optimize in action: '"+optimize+"'");
+ //DEBUG Logger.debug("Optimize in action: '"+optimize+"'");
return optimize;
}
@@ -150,73 +136,18 @@
Principal identity, Object credential )
throws Exception
{
-
+
MethodInvocation mi = new MethodInvocation(id, m, args, tx, identity,
credential);
-
- // HC: Transaction are started in the run() of the server session
- // since the XAResource of the session might need to be elisted with the TM.
- /*
- if (acknowledgeMode == MessageDrivenMetaData.CLIENT_ACKNOWLEDGE_MODE) {
- // CMT with required
- // Create tx
- tm.begin();
- Transaction txNew = tm.getTransaction();
- //DEBUG Logger.debug("CMT with required, getting transaction
"+txNew);
-
- mi.setTransaction(txNew);
- }
- */
// Set the right context classloader
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(container.getClassLoader());
-
+
try
- {
+ {
return container.invoke(mi);
- } finally {
-
- /*This is a test, to see if we may place tx logic here
- for message ack. This is probably architecuraly NOT the
- right place to do it. But the alternatives are wore:
- 1. Either it should be done further down the container chains.
- This makes the impl more tied to JMS, wich is not good.
- 2. Or it should be made in the Listener, wich does not have acces
- to the transaction.
- */
- /* HC: This has to be done further down the chains since if the bean is
- is a CMT with transactions required, the message receipt should be
- part of the transaction (in other words the JMS session has to be transacted
- and it's commit or rollback will cause the ack or nack of the message)
- Refrence the ejb_2_0 spec section :�14.4.7
- */
-
- /*
- if (acknowledgeMode == MessageDrivenMetaData.CLIENT_ACKNOWLEDGE_MODE) {
- //DEBUG Logger.debug("Transaction: " + mi.getTransaction());
-
- // Handle CMT with required
- try {
- Transaction t = mi.getTransaction();
- if (t !=null && t.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
- if (args[0] != null) {
- // actually roll it back
- t.rollback();
- // How dangerus is this ;-)
- Message msg = (Message)args[0];
- msg.acknowledge();
- }
- } else {
- t.commit();
- }
-
- }catch(Exception ex) {
- Logger.error("JMSContainerInvoker: Error in acking CMT Required " +
ex);
- }
- }
- */
-
+ } finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
}
@@ -231,9 +162,6 @@
public void init()
throws Exception
{
- // Set transaction manager - should this be done for msg to??
- // GenericProxy.setTransactionManager(container.getTransactionManager());
- //String destinationJNDI = "test2";
// Store TM reference locally - should we test for CMT Required
tm = (TxManager) container.getTransactionManager();
@@ -277,154 +205,198 @@
}
// Set up pool
- // FIXME - this way we don't know how to set acknowlede mode and
- // transaction mode
ServerSessionPoolFactory poolFactory =
(ServerSessionPoolFactory)jbossContext.lookup(serverSessionPoolFactoryJNDI);
- if (destinationType.equals("javax.jms.Topic")) {
- Logger.debug("Got destination type Topic for " + config.getEjbName());
-
- // All classes are different between topics and queues!!
- TopicConnectionFactory topicFactory =
- (TopicConnectionFactory)context.lookup(adapter.getTopicFactoryName());
- // Do we have a user - this is messy code (should be done for queues to)
- TopicConnection topicConnection;
- if(user != null) {
- Logger.debug("Creating topic connection with user: " + user +
- " passwd: " + config.getPasswd());
- topicConnection = topicFactory.createTopicConnection(user,
config.getPasswd());
- }else {
- topicConnection = topicFactory.createTopicConnection();
- }
-
- // Lookup destination
- Topic topic = (Topic)context.lookup(destinationJNDI);
-
- ServerSessionPool pool =
poolFactory.getServerSessionPool(topicConnection,maxPoolSize, false, acknowledgeMode,
new MessageListenerImpl(this));
+ if (destinationType.equals("javax.jms.Topic"))
+ {
+ Logger.debug("Got destination type Topic for " + config.getEjbName());
+
+ // All classes are different between topics and queues!!
+ TopicConnectionFactory topicFactory =
+ (TopicConnectionFactory)context.
+ lookup(adapter.getTopicFactoryName());
+ // Do we have a user - this is messy code (should be done for queues to)
+ TopicConnection topicConnection;
+ if(user != null)
+ {
+ Logger.debug("Creating topic connection with user: " +
+ user + " passwd: " + config.getPasswd());
+ topicConnection = topicFactory.
+ createTopicConnection(user, config.getPasswd());
+ }
+ else
+ {
+ topicConnection = topicFactory.createTopicConnection();
+ }
+
+ // Lookup destination
+ Topic topic = (Topic)context.lookup(destinationJNDI);
+
+ ServerSessionPool pool = poolFactory.
+ getServerSessionPool(
+ topicConnection,
+ maxPoolSize,
+ //Transacted
+ true,
+ acknowledgeMode,
+ new MessageListenerImpl(this));
// To be no-durable or durable
- if (config.getSubscriptionDurability() !=
MessageDrivenMetaData.DURABLE_SUBSCRIPTION) {
+ if (config.getSubscriptionDurability() !=
+ MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
+ {
// Create non durable
- connectionConsumer = topicConnection.createConnectionConsumer(topic,
messageSelector, pool, maxMessagesNr);
- } else {
- // ClientId
- // Test to set an identity - have to be there for durable!
- String clientId = config.getClientId();
- //topicConnection.setClientID(clientId);
- // Create durable - FIXME durable name!!
- //String durableName = config.getEjbName();
- String durableName = clientId != null ?
- clientId:
+ connectionConsumer = topicConnection.
+ createConnectionConsumer(
+ topic,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+ }
+ else
+ {
+ //Durable subscription
+ String clientId = config.getClientId();
+ String durableName = clientId != null ?
+ clientId:
config.getEjbName();
- connectionConsumer =
topicConnection.createDurableConnectionConsumer(topic, durableName,messageSelector,
pool, maxMessagesNr);
+ connectionConsumer = topicConnection.
+ createDurableConnectionConsumer(
+ topic,
+ durableName,
+ messageSelector,
+ pool,
+ maxMessagesNr);
}
// set global connection, so we have something to start() and close()
connection = topicConnection;
Logger.debug("Topic connectionConsumer set up");
- }else if(destinationType.equals("javax.jms.Queue")) {
- Logger.debug("Got destination type Queue");
- QueueConnectionFactory queueFactory =
- (QueueConnectionFactory)context.lookup(adapter.getQueueFactoryName());
-
- // Do we have a user
- QueueConnection queueConnection;
- if (user != null) {
- queueConnection =
queueFactory.createQueueConnection(user,config.getPasswd());
- } else {
- queueConnection = queueFactory.createQueueConnection();
- }
- // Test to set an identity
-
//topicConnection.setClientID(((MessageDrivenMetaData)container.getBeanMetaData()).getEjbName());
- // Lookup destination
- Queue queue = (Queue)context.lookup(destinationJNDI);
-
- ServerSessionPool pool =
poolFactory.getServerSessionPool(queueConnection,maxPoolSize, false, acknowledgeMode,
new MessageListenerImpl(this));
-
+ }
+ else if(destinationType.equals("javax.jms.Queue"))
+ {
+ Logger.debug("Got destination type Queue");
+ QueueConnectionFactory queueFactory =
+
(QueueConnectionFactory)context.lookup(adapter.getQueueFactoryName());
+
+ // Do we have a user
+ QueueConnection queueConnection;
+ if (user != null)
+ {
+ queueConnection = queueFactory.
+ createQueueConnection(
+ user,
+ config.getPasswd());
+ }
+ else
+ {
+ queueConnection = queueFactory.createQueueConnection();
+ }
- connectionConsumer = queueConnection.createConnectionConsumer(queue,
messageSelector, pool, maxMessagesNr);
+ // Lookup destination
+ Queue queue = (Queue)context.lookup(destinationJNDI);
+
+ ServerSessionPool pool = poolFactory.
+ getServerSessionPool(
+ queueConnection,
+ maxPoolSize,
+ //Transacted
+ true,
+ acknowledgeMode,
+ new MessageListenerImpl(this));
+
+
+ connectionConsumer = queueConnection.
+ createConnectionConsumer(
+ queue,
+ messageSelector,
+ pool,
+ maxMessagesNr);
// set global connection, so we have something to start() and close()
connection = queueConnection;
Logger.debug("Queue connectionConsumer set up");
- }
+ }
}
-
+
public void start()
- throws Exception
+ throws Exception
{
Logger.debug("Starting JMSContainerInvoker");
connection.start();
}
-
+
// What are the differences between stop and destroy?
- public void stop()
- {
- Logger.debug("Stopping JMSContainerInvoker");
- try {
- connectionConsumer.close();
- connection.close();
- }catch(JMSException ex) {
- Logger.log("Could not stop JMSContainerInvoker:" + ex);
- }
- }
-
- public void destroy()
- {
- Logger.debug("Destroying JMSContainerInvoker");
- }
-
- // XmlLoadable implementation
- public void importXml(Element element) throws DeploymentException {
- //String opt = MetaData.getElementContent(MetaData.getUniqueChild(elem
- //Load needed metadata here
-
- try {
- String maxMessages =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaxMessages"));
- maxMessagesNr = Integer.parseInt(maxMessages);
- String maxSize =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaximumSize"));
- maxPoolSize = Integer.parseInt(maxSize);
- } catch(NumberFormatException e) {
- //Noop will take default value
- } catch(DeploymentException e) {
- //Noop will take default value
- }
-
- // If these are not found we will get a DeploymentException, I hope
- jMSProviderAdapterJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
- serverSessionPoolFactoryJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element,
"ServerSessionPoolFactoryJNDI"));
-
- // Check java:/ prefix
- if (!jMSProviderAdapterJNDI.startsWith("java:/"))
- jMSProviderAdapterJNDI = "java:/"+jMSProviderAdapterJNDI;
- if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
- serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
+ public void stop()
+ {
+ Logger.debug("Stopping JMSContainerInvoker");
+ try {
+ if (connectionConsumer != null)
+ connectionConsumer.close();
+ if (connection != null)
+ connection.close();
+ }catch(JMSException ex) {
+ Logger.log("Could not stop JMSContainerInvoker:" + ex);
+ }
+ }
+
+ public void destroy()
+ {
+ Logger.debug("Destroying JMSContainerInvoker");
+ }
+
+ // XmlLoadable implementation
+ public void importXml(Element element) throws DeploymentException
+ {
+
+ try {
+ String maxMessages =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaxMessages"));
+ maxMessagesNr = Integer.parseInt(maxMessages);
+ String maxSize =
MetaData.getElementContent(MetaData.getUniqueChild(element, "MaximumSize"));
+ maxPoolSize = Integer.parseInt(maxSize);
+ } catch(NumberFormatException e) {
+ //Noop will take default value
+ } catch(DeploymentException e) {
+ //Noop will take default value
+ }
+
+ // If these are not found we will get a DeploymentException, I hope
+ jMSProviderAdapterJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
+ serverSessionPoolFactoryJNDI =
MetaData.getElementContent(MetaData.getUniqueChild(element,
"ServerSessionPoolFactoryJNDI"));
+
+ // Check java:/ prefix
+ if (!jMSProviderAdapterJNDI.startsWith("java:/"))
+ jMSProviderAdapterJNDI = "java:/"+jMSProviderAdapterJNDI;
+ if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
+ serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
class MessageListenerImpl implements MessageListener {
-
-
+
+
JMSContainerInvoker invoker = null;
-
+
MessageListenerImpl(JMSContainerInvoker invoker) {
this.invoker = invoker;
}
public void onMessage(Message message)
{
- Logger.debug(
- "[" + Thread.currentThread().hashCode() +
- "] Processing message " + message);
+ /*
+ Logger.debug(
+ "[" + Thread.currentThread().hashCode() +
+ "] Processing message " + message);
+ */
Object id;
try {
id = message.getJMSMessageID();
@@ -434,29 +406,22 @@
// Invoke, shuld we catch any Exceptions??
try {
invoker.invoke(
- // Object id - where used?
+ // Object id - where used?
id,
// Method to invoke
listenerMethod,
//argument
new Object[] {message},
//Transaction
- tm.getTransaction(),
+ tm.getTransaction(),
//Principal
null,
//Cred
null);
}catch(Exception ex) {
- Logger.log("Exception in CI message listener: panick what to do???: "
+ ex);
+ Logger.log("Exception in JMSCI message listener: : " + ex);
ex.printStackTrace();
}
- //try {
- // Should be handled up the chain handled either in invoke
- // finally with transaction, or in ASF stuff othe ack modes
- //message.acknowledge();
- //}catch(JMSException ex) {
- // Logger.log("Exception in CI message listener: could not acknowledge
the message: " + ex);
- //}
}