Author: rajdavies
Date: Fri Sep 11 17:10:49 2009
New Revision: 813927
URL: http://svn.apache.org/viewvc?rev=813927&view=rev
Log:
patch applied for https://issues.apache.org/activemq/browse/AMQ-2191
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=813927&r1=813926&r2=813927&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Fri Sep 11 17:10:49 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -27,11 +28,13 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
@@ -280,7 +283,7 @@
TransactionInfo info = new TransactionInfo(getConnectionId(),
transactionId, TransactionInfo.COMMIT_ONE_PHASE);
this.transactionId = null;
// Notify the listener that the tx was committed back
- this.connection.syncSendPacket(info);
+ syncSendPacketWithInterruptionHandling(info);
if (localTransactionEventListener != null) {
localTransactionEventListener.commitEvent();
}
@@ -399,7 +402,7 @@
TransactionInfo info = new TransactionInfo(getConnectionId(), x,
TransactionInfo.PREPARE);
// Find out if the server wants to commit or rollback.
- IntegerResponse response =
(IntegerResponse)this.connection.syncSendPacket(info);
+ IntegerResponse response =
(IntegerResponse)syncSendPacketWithInterruptionHandling(info);
return response.getResult();
} catch (JMSException e) {
@@ -433,7 +436,7 @@
// Let the server know that the tx is rollback.
TransactionInfo info = new TransactionInfo(getConnectionId(), x,
TransactionInfo.ROLLBACK);
- this.connection.syncSendPacket(info);
+ syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
@@ -472,7 +475,7 @@
// Notify the server that the tx was committed back
TransactionInfo info = new TransactionInfo(getConnectionId(), x,
onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
- this.connection.syncSendPacket(info);
+ syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l =
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
@@ -509,7 +512,7 @@
try {
// Tell the server to forget the transaction.
- this.connection.syncSendPacket(info);
+ syncSendPacketWithInterruptionHandling(info);
} catch (JMSException e) {
throw toXAException(e);
}
@@ -601,7 +604,7 @@
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(connectionId,
transactionId, TransactionInfo.END);
try {
- this.connection.syncSendPacket(info);
+ syncSendPacketWithInterruptionHandling(info);
if (LOG.isDebugEnabled()) {
LOG.debug("Ended XA transaction: " + transactionId);
}
@@ -628,6 +631,31 @@
}
/**
+ * Sends the given command. Also sends the command in case of interruption,
+ * so that important commands like rollback and commit are never
interrupted.
+ * If interruption occurred, set the interruption state of the current
+ * after performing the action again.
+ *
+ * @return the response
+ */
+ private Response syncSendPacketWithInterruptionHandling(Command command)
throws JMSException {
+ try {
+ return this.connection.syncSendPacket(command);
+ } catch (JMSException e) {
+ if (e.getLinkedException() instanceof
InterruptedIOException) {
+ try {
+ Thread.interrupted();
+ return
this.connection.syncSendPacket(command);
+ } finally {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ throw e;
+ }
+ }
+
+ /**
* Converts a JMSException from the server to an XAException. if the
* JMSException contained a linked XAException that is returned instead.
*