Hi,
I need to use XA connection and XA session created from it in
separate threads. Example context looks like:
<beans>
<bean id="broker" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
<property name="persistent" value="false"/>
<property name="transportConnectorURIs">
<list>
<value>tcp://localhost:5000</value>
</list>
</property>
</bean>
<bean id="jotm"
class="org.springframework.transaction.jta.JotmFactoryBean"/>
<bean id="jotmTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm"/>
</bean>
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:5000" />
</bean>
<bean id="messageReceiver"
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="transactionManager" ref="jotmTransactionManager"/>
<property name="transactionAttributes">
<props>
<prop key="*">PROPAGATION_REQUIRED</prop>
</props>
</property>
<property name="target">
<bean class="simple.MessageReceiverSimple">
<property name="jmsTemplate">
<bean
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory"
ref="connectionFactory"/>
<property name="defaultDestinationName"
value="messages.input"/>
</bean>
</property>
</bean>
</property>
<property name="proxyTargetClass" value="true"/>
</bean>
</beans>
MessageReceiverSimple.java is:
public class MessageReceiverSimple {
private Log log = LogFactory.getLog(getClass());
private JmsTemplate jmsTemplate;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void receive() {
Thread readerThread = new Thread(new Runnable(){
public void run() {
while(!Thread.currentThread().isInterrupted()) {
Message message = jmsTemplate.receive();
log.debug(message);
}
}
});
readerThread.start();
}
}
In this example code plain JMS API can be used instead of JmsTemplate
but it is not important in this case. Result will be the same.
On running this example I got:
javax.jms.JMSException: Session's XAResource has not been enlisted in a
distributed transaction.
at
org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
at
org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711)
at
org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572)
at
org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515)
at
org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105)
at
org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714)
at
org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677)
at
org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635)
at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432)
at
org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632)
at
org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619)
at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22)
at java.lang.Thread.run(Thread.java:595)
So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting
Session's XAResource in distributed transaction:
public class ActiveMQXAConnectionFactory implements ConnectionFactory,
XAConnectionFactory {
private XAConnectionFactory connectionFactory;
private JtaTransactionManager transactionManager;
public void setConnectionFactory(XAConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
public void setTransactionManager(JtaTransactionManager
transactionManager) {
this.transactionManager = transactionManager;
}
public Connection createConnection() throws JMSException {
return createXAConnection();
}
public Connection createConnection(String userName, String password)
throws JMSException {
return createXAConnection(userName, password);
}
public XAConnection createXAConnection() throws JMSException {
XAConnection connection =
connectionFactory.createXAConnection();
return new ActiveMQXAConnection(connection, transactionManager);
}
public XAConnection createXAConnection(String userName, String
password) throws JMSException {
XAConnection connection =
connectionFactory.createXAConnection(userName, password);
return new ActiveMQXAConnection(connection, transactionManager);
}
public class ActiveMQXAConnection implements XAConnection {
private XAConnection connection;
private JtaTransactionManager transactionManager;
public ActiveMQXAConnection(XAConnection connection,
JtaTransactionManager transactionManager) {
this.connection = connection;
this.transactionManager = transactionManager;
}
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
return createXASession();
}
public XASession createXASession() throws JMSException {
XASession session = connection.createXASession();
try {
transactionManager.getUserTransaction().begin();
transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
} catch (Exception e) {
e.printStackTrace();
}
return session;
}
public void close() throws JMSException {
connection.close();
}
public ConnectionConsumer createConnectionConsumer(Destination arg0,
String arg1, ServerSessionPool arg2, int arg3) throws JMSException {
return connection.createConnectionConsumer(arg0, arg1,
arg2, arg3);
}
public ConnectionConsumer createDurableConnectionConsumer(Topic arg0,
String arg1, String arg2, ServerSessionPool arg3, int arg4) throws
JMSException {
return connection.createDurableConnectionConsumer(arg0, arg1, arg2,
arg3, arg4);
}
public String getClientID() throws JMSException {
return connection.getClientID();
}
public ExceptionListener getExceptionListener() throws
JMSException {
return connection.getExceptionListener();
}
public ConnectionMetaData getMetaData() throws JMSException {
return connection.getMetaData();
}
public void setClientID(String arg0) throws JMSException {
connection.setClientID(arg0);
}
public void setExceptionListener(ExceptionListener arg0) throws
JMSException {
connection.setExceptionListener(arg0);
}
public void start() throws JMSException {
connection.start();
}
public void stop() throws JMSException {
connection.stop();
}
}
}
On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory
it I got:
INFO BrokerService - ActiveMQ null JMS Message Broker (localhost) is
starting
INFO BrokerService - For help or more information please see:
http://incubator.apache.org/activemq/
INFO TransportServerThreadSupport - Listening for connections at:
tcp://prokopiev.stc.donpac.ru:5000
INFO TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000
Started
INFO BrokerService - ActiveMQ JMS Message Broker (localhost,
ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started
INFO jotm - JOTM started with a local transaction factory which is not
bound.
INFO jotm - CAROL initialization
INFO ConfigurationRepository - No protocols were defined for property
'carol.protocols', trying with default protocol = 'jrmp'.
INFO jta - JOTM 2.0.10
INFO JtaTransactionManager - Using JTA UserTransaction:
[EMAIL PROTECTED]
INFO JtaTransactionManager - Using JTA TransactionManager:
[EMAIL PROTECTED]
INFO ManagementContext - JMX consoles can connect to
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass
feature enabled
DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5,
responseRequired = true, messageId =
ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1,
originalDestination = null, originalTransactionId = null, producerId =
ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination =
queue://messages.input, transactionId = null, expiration = 0, timestamp
= 1156158200912, arrival = 0, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID =
null, content = [EMAIL PROTECTED],
marshalledProperties = null, dataStructure = null, redeliveryCounter =
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody
= true}
INFO jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:)
So, my questions are:
1. My ActiveMQXAConnection.createXASession() implementation looks like
dirty hack and can't work propertly because transaction started but not
commited anywhere. What is the rigth place to start and commit/rollback
transaction?
2. Is it possible to include similar ActiveMQXAConnectionFactory
implementation into ActiveMQ? It will be very useful for using with
Spring DefaultMessageListenerContainer for example. Now JTA transactions
can't work with ActiveMQ/Spring/DefaultMessageListenerContainer.
I know about Jencks project but in some cases it can be more heavyweight
than Spring message driven POJO or any other custom consumer
implementation which must use JTA.
--
Thanks,
Eugene Prokopiev