Author: dejanb
Date: Fri Oct 8 12:08:32 2010
New Revision: 1005794
URL: http://svn.apache.org/viewvc?rev=1005794&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2950 - additional fix to support
parallel transactions
Added:
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
activemq/trunk/activemq-spring/pom.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1005794&r1=1005793&r2=1005794&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Fri Oct 8 12:08:32 2010
@@ -58,7 +58,7 @@ public class TransactionBroker extends B
// The prepared XA transactions.
private TransactionStore transactionStore;
- private Map<TransactionId, Transaction> xaTransactions = new
LinkedHashMap<TransactionId, Transaction>();
+ private Map<TransactionId, XATransaction> xaTransactions = new
LinkedHashMap<TransactionId, XATransaction>();
private ActiveMQMessageAudit audit;
public TransactionBroker(Broker next, TransactionStore transactionStore) {
@@ -125,7 +125,7 @@ public class TransactionBroker extends B
public TransactionId[] getPreparedTransactions(ConnectionContext context)
throws Exception {
List<TransactionId> txs = new ArrayList<TransactionId>();
synchronized (xaTransactions) {
- for (Iterator<Transaction> iter =
xaTransactions.values().iterator(); iter.hasNext();) {
+ for (Iterator<XATransaction> iter =
xaTransactions.values().iterator(); iter.hasNext();) {
Transaction tx = iter.next();
if (tx.isPrepared()) {
if (LOG.isDebugEnabled()) {
@@ -146,13 +146,13 @@ public class TransactionBroker extends B
public void beginTransaction(ConnectionContext context, TransactionId xid)
throws Exception {
// the transaction may have already been started.
if (xid.isXATransaction()) {
- Transaction transaction = null;
+ XATransaction transaction = null;
synchronized (xaTransactions) {
transaction = xaTransactions.get(xid);
if (transaction != null) {
return;
}
- transaction = new XATransaction(transactionStore,
(XATransactionId)xid, this);
+ transaction = new XATransaction(transactionStore,
(XATransactionId)xid, this, context.getConnectionId());
xaTransactions.put(xid, transaction);
}
} else {
@@ -252,9 +252,10 @@ public class TransactionBroker extends B
iter.remove();
}
- for (Transaction tx : xaTransactions.values()) {
+
+ for (XATransaction tx : xaTransactions.values()) {
try {
- if (!tx.isPrepared()) {
+ if (tx.getConnectionId().equals(info.getConnectionId()) &&
!tx.isPrepared()) {
tx.rollback();
}
} catch (Exception e) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1005794&r1=1005793&r2=1005794&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
Fri Oct 8 12:08:32 2010
@@ -20,6 +20,7 @@ import java.io.IOException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionStore;
@@ -36,11 +37,13 @@ public class XATransaction extends Trans
private final TransactionStore transactionStore;
private final XATransactionId xid;
private final TransactionBroker broker;
+ private final ConnectionId connectionId;
- public XATransaction(TransactionStore transactionStore, XATransactionId
xid, TransactionBroker broker) {
+ public XATransaction(TransactionStore transactionStore, XATransactionId
xid, TransactionBroker broker, ConnectionId connectionId) {
this.transactionStore = transactionStore;
this.xid = xid;
this.broker = broker;
+ this.connectionId = connectionId;
if (LOG.isDebugEnabled()) {
LOG.debug("XA Transaction new/begin : " + xid);
}
@@ -199,6 +202,10 @@ public class XATransaction extends Trans
broker.removeTransaction(xid);
}
+ public ConnectionId getConnectionId() {
+ return connectionId;
+ }
+
@Override
public TransactionId getTransactionId() {
return xid;
Modified: activemq/trunk/activemq-spring/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/pom.xml?rev=1005794&r1=1005793&r2=1005794&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/pom.xml (original)
+++ activemq/trunk/activemq-spring/pom.xml Fri Oct 8 12:08:32 2010
@@ -115,6 +115,29 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.jencks</groupId>
+ <artifactId>jencks</artifactId>
+ <version>2.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.4.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>activemq-ra</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.springframework.osgi</groupId>
<artifactId>spring-osgi-core</artifactId>
<exclusions>
Added:
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java?rev=1005794&view=auto
==============================================================================
---
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
(added)
+++
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
Fri Oct 8 12:08:32 2010
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.spring;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.transaction.TransactionConfiguration;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionException;
+import org.springframework.transaction.TransactionStatus;
+import
org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import javax.annotation.Resource;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import java.util.Arrays;
+
+...@runwith(SpringJUnit4ClassRunner.class)
+
+...@contextconfiguration(locations = {"classpath:spring/xa.xml"})
+...@transactionconfiguration(transactionManager = "transactionManager",
defaultRollback = false)
+public class ParallelXATransactionTest {
+
+ private static final Log LOG =
LogFactory.getLog(ParallelXATransactionTest.class);
+
+ @Resource(name = "transactionManager")
+ PlatformTransactionManager txManager = null;
+
+ @Resource(name = "transactionManager2")
+ PlatformTransactionManager txManager2 = null;
+
+
+ @Resource(name = "jmsTemplate")
+ JmsTemplate jmsTemplate = null;
+
+ @Resource(name = "jmsTemplate2")
+ JmsTemplate jmsTemplate2 = null;
+
+
+ public static final int NB_MSG = 100;
+ public static final String BODY = Arrays.toString(new int[1024]);
+ private static final String[] QUEUES = {"TEST.queue1", "TEST.queue2",
"TEST.queue3", "TEST.queue4", "TEST.queue5"};
+ private static final String AUDIT = "TEST.audit";
+ public static final int SLEEP = 500;
+
+ @Test
+ @DirtiesContext
+ public void testParalellXaTx() throws Exception {
+
+
+ class ProducerThread extends Thread {
+
+ PlatformTransactionManager txManager;
+ JmsTemplate jmsTemplate;
+ Exception lastException;
+
+
+ public ProducerThread(JmsTemplate jmsTemplate,
PlatformTransactionManager txManager) {
+ this.jmsTemplate = jmsTemplate;
+ this.txManager = txManager;
+ }
+
+ public void run() {
+ int i = 0;
+ while (i++ < 10) {
+
+ try {
+ Thread.sleep((long) (Math.random() * SLEEP));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ TransactionTemplate tt = new
TransactionTemplate(this.txManager);
+
+
+ try {
+ tt.execute(new TransactionCallbackWithoutResult() {
+ @Override
+ protected void
doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+
+ for (final String queue : QUEUES) {
+ jmsTemplate.send(queue + "," + AUDIT,
new MessageCreator() {
+ public Message
createMessage(Session session) throws JMSException {
+ return
session.createTextMessage("P1: " + queue + " - " + BODY);
+ }
+ });
+ Thread.sleep((long) (Math.random() *
SLEEP));
+ LOG.info("P1: Send msg to " + queue +
"," + AUDIT);
+ }
+
+ } catch (Exception e) {
+ Assert.fail("Exception occurred " + e);
+ }
+
+
+ }
+ });
+ } catch (TransactionException e) {
+ lastException = e;
+ break;
+ }
+
+ }
+ }
+
+ public Exception getLastException() {
+ return lastException;
+ }
+ }
+
+
+ ProducerThread t1 = new ProducerThread(jmsTemplate, txManager);
+ ProducerThread t2 = new ProducerThread(jmsTemplate2, txManager2);
+
+ t1.start();
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ if (t1.getLastException() != null) {
+ Assert.fail("Exception occurred " + t1.getLastException());
+ }
+
+ if (t2.getLastException() != null) {
+ Assert.fail("Exception occurred " + t2.getLastException());
+ }
+
+ }
+
+}
Added: activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml?rev=1005794&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml (added)
+++ activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml Fri Oct 8
12:08:32 2010
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
+ http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- broker -->
+
+ <amq:broker brokerName="test" useJmx="false" persistent="false">
+ <amq:transportConnectors>
+ <amq:transportConnector name="transport"
uri="nio://0.0.0.0:61616"/>
+ </amq:transportConnectors>
+ </amq:broker>
+
+ <!-- simple tx -->
+
+ <bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
+ <property name="connectionFactory" ref="connectionFactory2"/>
+ </bean>
+
+
+ <bean id="transactionManager2"
class="org.springframework.jms.connection.JmsTransactionManager">
+ <property name="connectionFactory" ref="connectionFactory2"/>
+ </bean>
+
+
+ <bean id="connectionFactory2"
class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL" value="tcp://localhost:61616"/>
+ <property name="userName" value="smx"/>
+ <property name="password" value="smx"/>
+ </bean>
+
+ <!-- xa tx -->
+
+ <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
+ <property name="connectionFactory" ref="connectionFactory"/>
+ </bean>
+
+ <bean id="transactionManager"
class="org.jencks.factory.TransactionManagerFactoryBean">
+ <property name="defaultTransactionTimeoutSeconds" value="300"/>
+ </bean>
+
+ <bean id="connectionFactory"
class="org.jencks.factory.ConnectionFactoryFactoryBean">
+ <property name="connectionManager" ref="jmsConnectionManager"/>
+ <property name="managedConnectionFactory"
ref="jmsManagedConnectionFactory"/>
+ </bean>
+
+ <bean id="jmsConnectionManager"
class="org.jencks.factory.ConnectionManagerFactoryBean">
+ <property name="transaction" value="xa"/>
+ <property name="transactionManager" ref="transactionManager"/>
+ <property name="poolMaxSize" value="20"/>
+ <property name="connectionTracker">
+ <bean class="org.jencks.factory.ConnectionTrackerFactoryBean">
+ <property name="geronimoTransactionManager"
ref="transactionManager"/>
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="jmsManagedConnectionFactory"
class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
+ <property name="resourceAdapter">
+ <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
+ <property name="serverUrl" value="tcp://localhost:61616"/>
+ <property name="maximumRedeliveries" value="6"/>
+ <property name="allPrefetchValues" value="1"/>
+ </bean>
+ </property>
+ </bean>
+
+</beans>
\ No newline at end of file