Author: tabish
Date: Mon Oct  3 12:43:51 2011
New Revision: 1178398

URL: http://svn.apache.org/viewvc?rev=1178398&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3465

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=1178398&r1=1178397&r2=1178398&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
 Mon Oct  3 12:43:51 2011
@@ -19,8 +19,8 @@ package org.apache.activemq;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
 import javax.jms.TransactionInProgressException;
@@ -68,7 +68,8 @@ public class TransactionContext implemen
     private static final Logger LOG = 
LoggerFactory.getLogger(TransactionContext.class);
 
     // XATransactionId -> ArrayList of TransactionContext objects
-    private final static ConcurrentHashMap<TransactionId, 
List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new 
ConcurrentHashMap<TransactionId, List<TransactionContext>>();
+    private final static HashMap<TransactionId, List<TransactionContext>> 
ENDED_XA_TRANSACTION_CONTEXTS =
+               new HashMap<TransactionId, List<TransactionContext>>();
 
     private final ActiveMQConnection connection;
     private final LongSequenceGenerator localTransactionIdGenerator;
@@ -88,8 +89,21 @@ public class TransactionContext implemen
     }
 
     public boolean isInXATransaction() {
-        return (transactionId != null && transactionId.isXATransaction()) ||
-               (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty() && 
ENDED_XA_TRANSACTION_CONTEXTS.containsValue(this));
+        if (transactionId != null && transactionId.isXATransaction()) {
+               return true;
+        } else {
+               if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
+                       synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                               for(List<TransactionContext> transactions : 
ENDED_XA_TRANSACTION_CONTEXTS.values()) {
+                                       if (transactions.contains(this)) {
+                                               return true;
+                                       }
+                               }
+                       }
+               }
+        }
+
+        return false;
     }
 
     public boolean isInLocalTransaction() {
@@ -437,32 +451,37 @@ public class TransactionContext implemen
             IntegerResponse response = 
(IntegerResponse)syncSendPacketWithInterruptionHandling(info);
             if (XAResource.XA_RDONLY == response.getResult()) {
                 // transaction stops now, may be syncs that need a callback
-                List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-                if (l != null && !l.isEmpty()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("firing afterCommit callbacks on XA_RDONLY 
from prepare: " + xid);
-                    }
-                    for (TransactionContext ctx : l) {
-                        ctx.afterCommit();
-                    }
-                }
+                       synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                       List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                       if (l != null && !l.isEmpty()) {
+                           if (LOG.isDebugEnabled()) {
+                               LOG.debug("firing afterCommit callbacks on 
XA_RDONLY from prepare: " + xid);
+                           }
+                           for (TransactionContext ctx : l) {
+                               ctx.afterCommit();
+                           }
+                       }
+                       }
             }
             return response.getResult();
 
         } catch (JMSException e) {
             LOG.warn("prepare of: " + x + " failed with: " + e, e);
-            List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-            if (l != null && !l.isEmpty()) {
-                for (TransactionContext ctx : l) {
-                    try {
-                        ctx.afterRollback();
-                    } catch (Throwable ignored) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("failed to firing afterRollback 
callbacks on prepare failure, txid: " + x + ", context: " + ctx, ignored);
-                        }
-                    }
-                }
-            }
+               synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                   List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                   if (l != null && !l.isEmpty()) {
+                       for (TransactionContext ctx : l) {
+                           try {
+                               ctx.afterRollback();
+                           } catch (Throwable ignored) {
+                               if (LOG.isDebugEnabled()) {
+                                   LOG.debug("failed to firing afterRollback 
callbacks on prepare failure, txid: " +
+                                                 x + ", context: " + ctx, 
ignored);
+                               }
+                           }
+                       }
+                   }
+               }
             throw toXAException(e);
         }
     }
@@ -495,13 +514,14 @@ public class TransactionContext implemen
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, 
TransactionInfo.ROLLBACK);
             syncSendPacketWithInterruptionHandling(info);
 
-            List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-            if (l != null && !l.isEmpty()) {
-                for (TransactionContext ctx : l) {
-                    ctx.afterRollback();
-                }
-            }
-
+               synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                   List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                   if (l != null && !l.isEmpty()) {
+                       for (TransactionContext ctx : l) {
+                           ctx.afterRollback();
+                       }
+                   }
+               }
         } catch (JMSException e) {
             throw toXAException(e);
         }
@@ -534,32 +554,36 @@ public class TransactionContext implemen
 
             syncSendPacketWithInterruptionHandling(info);
 
-            List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-            if (l != null && !l.isEmpty()) {
-                for (TransactionContext ctx : l) {
-                    try {
-                        ctx.afterCommit();
-                    } catch (Exception ignored) {
-                        LOG.debug("ignoring exception from after completion on 
ended transaction: " + ignored, ignored);
-                    }
-                }
-            }
+               synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                   List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                   if (l != null && !l.isEmpty()) {
+                       for (TransactionContext ctx : l) {
+                           try {
+                               ctx.afterCommit();
+                           } catch (Exception ignored) {
+                               LOG.debug("ignoring exception from after 
completion on ended transaction: " + ignored, ignored);
+                           }
+                       }
+                   }
+               }
 
         } catch (JMSException e) {
             LOG.warn("commit of: " + x + " failed with: " + e, e);
             if (onePhase) {
-                List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-                if (l != null && !l.isEmpty()) {
-                    for (TransactionContext ctx : l) {
-                        try {
-                            ctx.afterRollback();
-                        } catch (Throwable ignored) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("failed to firing afterRollback 
callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
-                            }
-                        }
-                    }
-                }
+                       synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                       List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                       if (l != null && !l.isEmpty()) {
+                           for (TransactionContext ctx : l) {
+                               try {
+                                   ctx.afterRollback();
+                               } catch (Throwable ignored) {
+                                   if (LOG.isDebugEnabled()) {
+                                       LOG.debug("failed to firing 
afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, 
ignored);
+                                   }
+                               }
+                           }
+                       }
+                       }
             }
             throw toXAException(e);
         }
@@ -592,7 +616,9 @@ public class TransactionContext implemen
         } catch (JMSException e) {
             throw toXAException(e);
         }
-        ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+       synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+               ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+       }
     }
 
     public boolean isSameRM(XAResource xaResource) throws XAException {
@@ -691,14 +717,16 @@ public class TransactionContext implemen
 
                 // Add our self to the list of contexts that are interested in
                 // post commit/rollback events.
-                List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
-                if (l == null) {
-                    l = new ArrayList<TransactionContext>(3);
-                    ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
-                    l.add(this);
-                } else if (!l.contains(this)) {
-                    l.add(this);
-                }
+                       synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
+                       List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
+                       if (l == null) {
+                           l = new ArrayList<TransactionContext>(3);
+                           ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
+                           l.add(this);
+                       } else if (!l.contains(this)) {
+                           l.add(this);
+                       }
+                       }
             }
 
             // dis-associate


Reply via email to