Author: tabish
Date: Tue May 21 22:18:36 2013
New Revision: 1484999
URL: http://svn.apache.org/r1484999
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4548
Modified:
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified:
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1484999&r1=1484998&r2=1484999&view=diff
==============================================================================
---
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue May 21 22:18:36 2013
@@ -594,13 +594,16 @@ public abstract class MessageDatabase ex
}
});
- // rollback any recovered inflight local transactions
+ // rollback any recovered inflight local transactions, and discard
any inflight XA transactions.
Set<TransactionId> toRollback = new HashSet<TransactionId>();
+ Set<TransactionId> toDiscard = new HashSet<TransactionId>();
synchronized (inflightTransactions) {
for (Iterator<TransactionId> it =
inflightTransactions.keySet().iterator(); it.hasNext(); ) {
TransactionId id = it.next();
if (id.isLocalTransaction()) {
toRollback.add(id);
+ } else {
+ toDiscard.add(id);
}
}
for (TransactionId tx: toRollback) {
@@ -609,6 +612,12 @@ public abstract class MessageDatabase ex
}
store(new
KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)),
false, null, null);
}
+ for (TransactionId tx: toDiscard) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("discarding recovered in-flight XA
transaction " + tx);
+ }
+ inflightTransactions.remove(tx);
+ }
}
synchronized (preparedTransactions) {