Hi,

I need to use XA connection and XA session created from it in
separate threads. Example context looks like:

<beans>

<bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
                <property name="persistent" value="false"/>
                <property name="transportConnectorURIs">
                        <list>
                                <value>tcp://localhost:5000</value>
                        </list>
                </property>
        </bean>
        
<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/> <bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
                <property name="userTransaction" ref="jotm"/>
        </bean>
        
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:5000" />
        </bean>
        
<bean id="messageReceiver" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
        <property name="transactionManager" ref="jotmTransactionManager"/>
        <property name="transactionAttributes">
            <props>
                <prop key="*">PROPAGATION_REQUIRED</prop>
            </props>
        </property>
        <property name="target">
                <bean class="simple.MessageReceiverSimple">
                                <property name="jmsTemplate">
                                        <bean 
class="org.springframework.jms.core.JmsTemplate">
                                                <property name="connectionFactory" 
ref="connectionFactory"/>
                                                <property name="defaultDestinationName" 
value="messages.input"/>
                                        </bean>
                                </property>
                        </bean>
        </property>
                <property name="proxyTargetClass" value="true"/>
    </bean>
        
</beans>

MessageReceiverSimple.java is:

public class MessageReceiverSimple {

        private Log log = LogFactory.getLog(getClass());
        
        private JmsTemplate jmsTemplate;
        
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                this.jmsTemplate = jmsTemplate;
        }
        
        public void receive() {
                Thread readerThread = new Thread(new Runnable(){
                        public void run() {
                                while(!Thread.currentThread().isInterrupted()) {
                                        Message message = jmsTemplate.receive();
                                        log.debug(message);
                                }
                        }                       
                });
                readerThread.start();
        }
        
}

In this example code plain JMS API can be used instead of JmsTemplate but it is not important in this case. Result will be the same.

On running this example I got:

javax.jms.JMSException: Session's XAResource has not been enlisted in a distributed transaction. at org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109) at org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711) at org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572) at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515) at org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105)
        at 
org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714)
        at 
org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677)
        at 
org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635)
        at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432)
        at 
org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632)
        at 
org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619)
        at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22)
        at java.lang.Thread.run(Thread.java:595)

So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting Session's XAResource in distributed transaction:

public class ActiveMQXAConnectionFactory implements ConnectionFactory, XAConnectionFactory {

        private XAConnectionFactory connectionFactory;
        private JtaTransactionManager transactionManager;
        
        public void setConnectionFactory(XAConnectionFactory connectionFactory) 
{
                this.connectionFactory = connectionFactory;
        }

public void setTransactionManager(JtaTransactionManager transactionManager) {
                this.transactionManager = transactionManager;
        }

        public Connection createConnection() throws JMSException {
                return createXAConnection();
        }

public Connection createConnection(String userName, String password) throws JMSException {
                return createXAConnection(userName, password);
        }

        public XAConnection createXAConnection() throws JMSException {
                XAConnection connection = 
connectionFactory.createXAConnection();
                return new ActiveMQXAConnection(connection, transactionManager);
        }

public XAConnection createXAConnection(String userName, String password) throws JMSException { XAConnection connection = connectionFactory.createXAConnection(userName, password);
                return new ActiveMQXAConnection(connection, transactionManager);
        }
        
        public class ActiveMQXAConnection implements XAConnection {

                private XAConnection connection;
                private JtaTransactionManager transactionManager;
                
public ActiveMQXAConnection(XAConnection connection, JtaTransactionManager transactionManager) {
                        this.connection = connection;
                        this.transactionManager = transactionManager;
                }
                
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
                        return createXASession();
                }

                public XASession createXASession() throws JMSException {
                        XASession session = connection.createXASession();
                        try {
                                transactionManager.getUserTransaction().begin();
transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                        return session;
                }

                public void close() throws JMSException {
                        connection.close();                     
                }

public ConnectionConsumer createConnectionConsumer(Destination arg0, String arg1, ServerSessionPool arg2, int arg3) throws JMSException {
                        return connection.createConnectionConsumer(arg0, arg1, 
arg2, arg3);
                }

public ConnectionConsumer createDurableConnectionConsumer(Topic arg0, String arg1, String arg2, ServerSessionPool arg3, int arg4) throws JMSException { return connection.createDurableConnectionConsumer(arg0, arg1, arg2, arg3, arg4);
                }

                public String getClientID() throws JMSException {
                        return connection.getClientID();
                }

                public ExceptionListener getExceptionListener() throws 
JMSException {
                        return connection.getExceptionListener();
                }

                public ConnectionMetaData getMetaData() throws JMSException {
                        return connection.getMetaData();
                }

                public void setClientID(String arg0) throws JMSException {
                        connection.setClientID(arg0);
                }

public void setExceptionListener(ExceptionListener arg0) throws JMSException {
                        connection.setExceptionListener(arg0);
                }

                public void start() throws JMSException {
                        connection.start();
                }

                public void stop() throws JMSException {
                        connection.stop();
                }
                
        }

}

On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory it I got:

INFO BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting INFO BrokerService - For help or more information please see: http://incubator.apache.org/activemq/ INFO TransportServerThreadSupport - Listening for connections at: tcp://prokopiev.stc.donpac.ru:5000 INFO TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 Started INFO BrokerService - ActiveMQ JMS Message Broker (localhost, ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started INFO jotm - JOTM started with a local transaction factory which is not bound.
INFO  jotm - CAROL initialization
INFO ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'.
INFO  jta - JOTM 2.0.10
INFO JtaTransactionManager - Using JTA UserTransaction: [EMAIL PROTECTED] INFO JtaTransactionManager - Using JTA TransactionManager: [EMAIL PROTECTED] INFO ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass feature enabled DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination = queue://messages.input, transactionId = null, expiration = 0, timestamp = 1156158200912, arrival = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [EMAIL PROTECTED], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true}
INFO  jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:)

So, my questions are:

1. My ActiveMQXAConnection.createXASession() implementation looks like dirty hack and can't work propertly because transaction started but not commited anywhere. What is the rigth place to start and commit/rollback transaction?

2. Is it possible to include similar ActiveMQXAConnectionFactory implementation into ActiveMQ? It will be very useful for using with Spring DefaultMessageListenerContainer for example. Now JTA transactions can't work with ActiveMQ/Spring/DefaultMessageListenerContainer.

I know about Jencks project but in some cases it can be more heavyweight than Spring message driven POJO or any other custom consumer implementation which must use JTA.

--
Thanks,
Eugene Prokopiev


Reply via email to