Author: tabish
Date: Mon Oct 3 12:43:51 2011
New Revision: 1178398
URL: http://svn.apache.org/viewvc?rev=1178398&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3465
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=1178398&r1=1178397&r2=1178398&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Mon Oct 3 12:43:51 2011
@@ -19,8 +19,8 @@ package org.apache.activemq;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.jms.TransactionInProgressException;
@@ -68,7 +68,8 @@ public class TransactionContext implemen
private static final Logger LOG =
LoggerFactory.getLogger(TransactionContext.class);
// XATransactionId -> ArrayList of TransactionContext objects
- private final static ConcurrentHashMap<TransactionId,
List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new
ConcurrentHashMap<TransactionId, List<TransactionContext>>();
+ private final static HashMap<TransactionId, List<TransactionContext>>
ENDED_XA_TRANSACTION_CONTEXTS =
+ new HashMap<TransactionId, List<TransactionContext>>();
private final ActiveMQConnection connection;
private final LongSequenceGenerator localTransactionIdGenerator;
@@ -88,8 +89,21 @@ public class TransactionContext implemen
}
public boolean isInXATransaction() {
- return (transactionId != null && transactionId.isXATransaction()) ||
- (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty() &&
ENDED_XA_TRANSACTION_CONTEXTS.containsValue(this));
+ if (transactionId != null && transactionId.isXATransaction()) {
+ return true;
+ } else {
+ if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ for(List<TransactionContext> transactions :
ENDED_XA_TRANSACTION_CONTEXTS.values()) {
+ if (transactions.contains(this)) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+
+ return false;
}
public boolean isInLocalTransaction() {
@@ -437,32 +451,37 @@ public class TransactionContext implemen
IntegerResponse response =
(IntegerResponse)syncSendPacketWithInterruptionHandling(info);
if (XAResource.XA_RDONLY == response.getResult()) {
// transaction stops now, may be syncs that need a callback
- List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
- if (l != null && !l.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("firing afterCommit callbacks on XA_RDONLY
from prepare: " + xid);
- }
- for (TransactionContext ctx : l) {
- ctx.afterCommit();
- }
- }
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("firing afterCommit callbacks on
XA_RDONLY from prepare: " + xid);
+ }
+ for (TransactionContext ctx : l) {
+ ctx.afterCommit();
+ }
+ }
+ }
}
return response.getResult();
} catch (JMSException e) {
LOG.warn("prepare of: " + x + " failed with: " + e, e);
- List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
- if (l != null && !l.isEmpty()) {
- for (TransactionContext ctx : l) {
- try {
- ctx.afterRollback();
- } catch (Throwable ignored) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("failed to firing afterRollback
callbacks on prepare failure, txid: " + x + ", context: " + ctx, ignored);
- }
- }
- }
- }
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ for (TransactionContext ctx : l) {
+ try {
+ ctx.afterRollback();
+ } catch (Throwable ignored) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("failed to firing afterRollback
callbacks on prepare failure, txid: " +
+ x + ", context: " + ctx,
ignored);
+ }
+ }
+ }
+ }
+ }
throw toXAException(e);
}
}
@@ -495,13 +514,14 @@ public class TransactionContext implemen
TransactionInfo info = new TransactionInfo(getConnectionId(), x,
TransactionInfo.ROLLBACK);
syncSendPacketWithInterruptionHandling(info);
- List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
- if (l != null && !l.isEmpty()) {
- for (TransactionContext ctx : l) {
- ctx.afterRollback();
- }
- }
-
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ for (TransactionContext ctx : l) {
+ ctx.afterRollback();
+ }
+ }
+ }
} catch (JMSException e) {
throw toXAException(e);
}
@@ -534,32 +554,36 @@ public class TransactionContext implemen
syncSendPacketWithInterruptionHandling(info);
- List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
- if (l != null && !l.isEmpty()) {
- for (TransactionContext ctx : l) {
- try {
- ctx.afterCommit();
- } catch (Exception ignored) {
- LOG.debug("ignoring exception from after completion on
ended transaction: " + ignored, ignored);
- }
- }
- }
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ for (TransactionContext ctx : l) {
+ try {
+ ctx.afterCommit();
+ } catch (Exception ignored) {
+ LOG.debug("ignoring exception from after
completion on ended transaction: " + ignored, ignored);
+ }
+ }
+ }
+ }
} catch (JMSException e) {
LOG.warn("commit of: " + x + " failed with: " + e, e);
if (onePhase) {
- List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
- if (l != null && !l.isEmpty()) {
- for (TransactionContext ctx : l) {
- try {
- ctx.afterRollback();
- } catch (Throwable ignored) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("failed to firing afterRollback
callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
- }
- }
- }
- }
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ for (TransactionContext ctx : l) {
+ try {
+ ctx.afterRollback();
+ } catch (Throwable ignored) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("failed to firing
afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx,
ignored);
+ }
+ }
+ }
+ }
+ }
}
throw toXAException(e);
}
@@ -592,7 +616,9 @@ public class TransactionContext implemen
} catch (JMSException e) {
throw toXAException(e);
}
- ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ }
}
public boolean isSameRM(XAResource xaResource) throws XAException {
@@ -691,14 +717,16 @@ public class TransactionContext implemen
// Add our self to the list of contexts that are interested in
// post commit/rollback events.
- List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
- if (l == null) {
- l = new ArrayList<TransactionContext>(3);
- ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
- l.add(this);
- } else if (!l.contains(this)) {
- l.add(this);
- }
+ synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+ List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
+ if (l == null) {
+ l = new ArrayList<TransactionContext>(3);
+ ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
+ l.add(this);
+ } else if (!l.contains(this)) {
+ l.add(this);
+ }
+ }
}
// dis-associate