Author: rajdavies
Date: Fri Sep 11 17:10:49 2009
New Revision: 813927

URL: http://svn.apache.org/viewvc?rev=813927&view=rev
Log:
patch applied for https://issues.apache.org/activemq/browse/AMQ-2191

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=813927&r1=813926&r2=813927&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
 Fri Sep 11 17:10:49 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -27,11 +28,13 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.DataArrayResponse;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.IntegerResponse;
 import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
@@ -280,7 +283,7 @@
             TransactionInfo info = new TransactionInfo(getConnectionId(), 
transactionId, TransactionInfo.COMMIT_ONE_PHASE);
             this.transactionId = null;
             // Notify the listener that the tx was committed back
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
             if (localTransactionEventListener != null) {
                 localTransactionEventListener.commitEvent();
             }
@@ -399,7 +402,7 @@
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, 
TransactionInfo.PREPARE);
 
             // Find out if the server wants to commit or rollback.
-            IntegerResponse response = 
(IntegerResponse)this.connection.syncSendPacket(info);
+            IntegerResponse response = 
(IntegerResponse)syncSendPacketWithInterruptionHandling(info);
             return response.getResult();
 
         } catch (JMSException e) {
@@ -433,7 +436,7 @@
 
             // Let the server know that the tx is rollback.
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, 
TransactionInfo.ROLLBACK);
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
 
             List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
             if (l != null && !l.isEmpty()) {
@@ -472,7 +475,7 @@
             // Notify the server that the tx was committed back
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, 
onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
 
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
 
             List<TransactionContext> l = 
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
             if (l != null && !l.isEmpty()) {
@@ -509,7 +512,7 @@
 
         try {
             // Tell the server to forget the transaction.
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
         } catch (JMSException e) {
             throw toXAException(e);
         }
@@ -601,7 +604,7 @@
             if (transactionId != null) {
                 TransactionInfo info = new TransactionInfo(connectionId, 
transactionId, TransactionInfo.END);
                 try {
-                    this.connection.syncSendPacket(info);
+                    syncSendPacketWithInterruptionHandling(info);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Ended XA transaction: " + transactionId);
                     }
@@ -628,6 +631,31 @@
     }
 
     /**
+     * Sends the given command. Also sends the command in case of interruption,
+     * so that important commands like rollback and commit are never 
interrupted.
+     * If interruption occurred, set the interruption state of the current 
+     * after performing the action again. 
+     * 
+     * @return the response
+     */
+    private Response syncSendPacketWithInterruptionHandling(Command command) 
throws JMSException {
+       try {
+                       return this.connection.syncSendPacket(command);
+               } catch (JMSException e) {
+                       if (e.getLinkedException() instanceof 
InterruptedIOException) {
+                               try {
+                                       Thread.interrupted();
+                                       return 
this.connection.syncSendPacket(command);
+                               } finally {
+                                       Thread.currentThread().interrupt();
+                               }                               
+                       }
+                       
+                       throw e;
+               }
+    }
+
+    /**
      * Converts a JMSException from the server to an XAException. if the
      * JMSException contained a linked XAException that is returned instead.
      * 


Reply via email to