On 8/21/06, Eugene Prokopiev <[EMAIL PROTECTED]> wrote:
Hi,I need to use XA connection and XA session created from it in separate threads. Example context looks like: <beans> <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop"> <property name="persistent" value="false"/> <property name="transportConnectorURIs"> <list> <value>tcp://localhost:5000</value> </list> </property> </bean> <bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/> <bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="userTransaction" ref="jotm"/> </bean> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:5000" /> </bean> <bean id="messageReceiver" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"> <property name="transactionManager" ref="jotmTransactionManager"/> <property name="transactionAttributes"> <props> <prop key="*">PROPAGATION_REQUIRED</prop> </props> </property> <property name="target"> <bean class="simple.MessageReceiverSimple"> <property name="jmsTemplate"> <bean class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="messages.input"/> </bean> </property> </bean> </property> <property name="proxyTargetClass" value="true"/> </bean> </beans> MessageReceiverSimple.java is: public class MessageReceiverSimple { private Log log = LogFactory.getLog(getClass()); private JmsTemplate jmsTemplate; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void receive() { Thread readerThread = new Thread(new Runnable(){ public void run() { while(!Thread.currentThread().isInterrupted()) { Message message = jmsTemplate.receive(); log.debug(message); } } }); readerThread.start(); } } In this example code plain JMS API can be used instead of JmsTemplate but it is not important in this case. Result will be the same. On running this example I got: javax.jms.JMSException: Session's XAResource has not been enlisted in a distributed transaction. at org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109) at org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711) at org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572) at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515) at org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105) at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714) at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677) at org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432) at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632) at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619) at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22) at java.lang.Thread.run(Thread.java:595) So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting Session's XAResource in distributed transaction: public class ActiveMQXAConnectionFactory implements ConnectionFactory, XAConnectionFactory { private XAConnectionFactory connectionFactory; private JtaTransactionManager transactionManager; public void setConnectionFactory(XAConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void setTransactionManager(JtaTransactionManager transactionManager) { this.transactionManager = transactionManager; } public Connection createConnection() throws JMSException { return createXAConnection(); } public Connection createConnection(String userName, String password) throws JMSException { return createXAConnection(userName, password); } public XAConnection createXAConnection() throws JMSException { XAConnection connection = connectionFactory.createXAConnection(); return new ActiveMQXAConnection(connection, transactionManager); } public XAConnection createXAConnection(String userName, String password) throws JMSException { XAConnection connection = connectionFactory.createXAConnection(userName, password); return new ActiveMQXAConnection(connection, transactionManager); } public class ActiveMQXAConnection implements XAConnection { private XAConnection connection; private JtaTransactionManager transactionManager; public ActiveMQXAConnection(XAConnection connection, JtaTransactionManager transactionManager) { this.connection = connection; this.transactionManager = transactionManager; } public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { return createXASession(); } public XASession createXASession() throws JMSException { XASession session = connection.createXASession(); try { transactionManager.getUserTransaction().begin(); transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource()); } catch (Exception e) { e.printStackTrace(); } return session; } public void close() throws JMSException { connection.close(); } public ConnectionConsumer createConnectionConsumer(Destination arg0, String arg1, ServerSessionPool arg2, int arg3) throws JMSException { return connection.createConnectionConsumer(arg0, arg1, arg2, arg3); } public ConnectionConsumer createDurableConnectionConsumer(Topic arg0, String arg1, String arg2, ServerSessionPool arg3, int arg4) throws JMSException { return connection.createDurableConnectionConsumer(arg0, arg1, arg2, arg3, arg4); } public String getClientID() throws JMSException { return connection.getClientID(); } public ExceptionListener getExceptionListener() throws JMSException { return connection.getExceptionListener(); } public ConnectionMetaData getMetaData() throws JMSException { return connection.getMetaData(); } public void setClientID(String arg0) throws JMSException { connection.setClientID(arg0); } public void setExceptionListener(ExceptionListener arg0) throws JMSException { connection.setExceptionListener(arg0); } public void start() throws JMSException { connection.start(); } public void stop() throws JMSException { connection.stop(); } } } On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory it I got: INFO BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting INFO BrokerService - For help or more information please see: http://incubator.apache.org/activemq/ INFO TransportServerThreadSupport - Listening for connections at: tcp://prokopiev.stc.donpac.ru:5000 INFO TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 Started INFO BrokerService - ActiveMQ JMS Message Broker (localhost, ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started INFO jotm - JOTM started with a local transaction factory which is not bound. INFO jotm - CAROL initialization INFO ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'. INFO jta - JOTM 2.0.10 INFO JtaTransactionManager - Using JTA UserTransaction: [EMAIL PROTECTED] INFO JtaTransactionManager - Using JTA TransactionManager: [EMAIL PROTECTED] INFO ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass feature enabled DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination = queue://messages.input, transactionId = null, expiration = 0, timestamp = 1156158200912, arrival = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [EMAIL PROTECTED], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true} INFO jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:) So, my questions are: 1. My ActiveMQXAConnection.createXASession() implementation looks like dirty hack and can't work propertly because transaction started but not commited anywhere. What is the rigth place to start and commit/rollback transaction?
Normally the JCA container does this. If you are using Spring then the Spring Transaction Manager or Message Listener container shoudl do this - not the connection factory
2. Is it possible to include similar ActiveMQXAConnectionFactory implementation into ActiveMQ? It will be very useful for using with Spring DefaultMessageListenerContainer for example.
I'd rather fix Spring's container to work with any JMS provider properly than adding a dirty hack to ActiveMQ Enlistment is the responsibility of the container - be it Jencks, MDB container or Spring.
Now JTA transactions can't work with ActiveMQ/Spring/DefaultMessageListenerContainer.
You'd best ask the Spring guys - I'm not sure if the Spring container supports JTA -- James ------- http://radio.weblogs.com/0112098/
