Repository: qpid-jms
Updated Branches:
  refs/heads/master bdf91953d -> 69de1a540


QPIDJMS-125 Initial pass at refactoring local TX handling to add more
thread safety and better failover handling.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/69de1a54
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/69de1a54
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/69de1a54

Branch: refs/heads/master
Commit: 69de1a54067de4f181f3cdb305afa37de8da1f23
Parents: bdf9195
Author: Timothy Bish <[email protected]>
Authored: Mon Oct 12 18:24:10 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Mon Oct 12 18:24:10 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  73 ++++-
 .../qpid/jms/JmsLocalTransactionContext.java    | 321 ++++++++++++++-----
 .../qpid/jms/JmsNoTxTransactionContext.java     |   4 +
 .../java/org/apache/qpid/jms/JmsSession.java    |  11 +-
 .../apache/qpid/jms/JmsTransactionContext.java  |  15 +-
 .../qpid/jms/provider/ProviderFuture.java       |  19 +-
 .../jms/provider/ProviderSynchronization.java   |  30 ++
 .../jms/provider/failover/FailoverProvider.java |  12 +-
 .../integration/ConnectionIntegrationTest.java  |  12 +
 .../QueueBrowserIntegrationTest.java            |  14 +-
 .../jms/integration/SessionIntegrationTest.java | 115 ++++---
 .../FailoverProviderOfflineBehaviorTest.java    |   2 +-
 .../jms/failover/JmsTxConsumerFailoverTest.java |   4 +-
 .../transactions/JmsTransactedConsumerTest.java |   3 +-
 14 files changed, 475 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 88227c9..1686625 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -72,6 +72,7 @@ import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.util.IdGenerator;
 import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
@@ -529,10 +530,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     //----- Provider interface methods 
---------------------------------------//
 
     void createResource(JmsResource resource) throws JMSException {
+        createResource(resource, null);
+    }
+
+    void createResource(JmsResource resource, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.create(resource, request);
@@ -546,10 +551,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void startResource(JmsResource resource) throws JMSException {
+        startResource(resource, null);
+    }
+
+    void startResource(JmsResource resource, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.start(resource, request);
@@ -563,10 +572,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void stopResource(JmsResource resource) throws JMSException {
+        stopResource(resource, null);
+    }
+
+    void stopResource(JmsResource resource, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.stop(resource, request);
@@ -580,10 +593,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void destroyResource(JmsResource resource) throws JMSException {
+        destroyResource(resource, null);
+    }
+
+    void destroyResource(JmsResource resource, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.destroy(resource, request);
@@ -597,6 +614,10 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void send(JmsOutboundMessageDispatch envelope) throws JMSException {
+        send(envelope, null);
+    }
+
+    void send(JmsOutboundMessageDispatch envelope, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         // TODO - We don't currently have a way to say that an operation
@@ -610,7 +631,7 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
         //        we can manage order of callback events to async senders at
         //        this level.
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.send(envelope, request);
@@ -624,10 +645,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) 
throws JMSException {
+        acknowledge(envelope, ackType, null);
+    }
+
+    void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, 
ProviderSynchronization synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             provider.acknowledge(envelope, ackType, request);
             request.sync();
         } catch (Exception ioe) {
@@ -636,10 +661,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void acknowledge(JmsSessionId sessionId) throws JMSException {
+        acknowledge(sessionId, null);
+    }
+
+    void acknowledge(JmsSessionId sessionId, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             provider.acknowledge(sessionId, request);
             request.sync();
         } catch (Exception ioe) {
@@ -648,10 +677,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void unsubscribe(String name) throws JMSException {
+        unsubscribe(name, null);
+    }
+
+    void unsubscribe(String name, ProviderSynchronization synchronization) 
throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.unsubscribe(name, request);
@@ -665,10 +698,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void commit(JmsSessionId sessionId) throws JMSException {
+        commit(sessionId, null);
+    }
+
+    void commit(JmsSessionId sessionId, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.commit(sessionId, request);
@@ -682,10 +719,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void rollback(JmsSessionId sessionId) throws JMSException {
+        rollback(sessionId, null);
+    }
+
+    void rollback(JmsSessionId sessionId, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.rollback(sessionId, request);
@@ -699,10 +740,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void recover(JmsSessionId sessionId) throws JMSException {
+        recover(sessionId, null);
+    }
+
+    void recover(JmsSessionId sessionId, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.recover(sessionId, request);
@@ -716,10 +761,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     void pull(JmsConsumerId consumerId, long timeout) throws JMSException {
+        pull(consumerId, timeout, null);
+    }
+
+    void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = new ProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.pull(consumerId, timeout, request);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 2760efd..6b252ef 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.JMSException;
 import javax.jms.TransactionRolledBackException;
@@ -30,6 +31,7 @@ import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,13 +42,16 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JmsLocalTransactionContext.class);
 
-    private List<JmsTxSynchronization> synchronizations;
+    private final List<JmsTxSynchronization> synchronizations = new 
ArrayList<JmsTxSynchronization>();
     private final JmsSession session;
     private final JmsConnection connection;
     private JmsTransactionId transactionId;
-    private boolean failed;
+    private volatile boolean failed;
+    private volatile boolean hasWork;
     private JmsTransactionListener listener;
 
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
     public JmsLocalTransactionContext(JmsSession session) {
         this.session = session;
         this.connection = session.getConnection();
@@ -54,35 +59,67 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
     @Override
     public void send(JmsConnection connection, JmsOutboundMessageDispatch 
envelope) throws JMSException {
-        if (!isFailed()) {
-            begin();
-            connection.send(envelope);
+        lock.readLock().lock();
+        try {
+            if (isFailed()) {
+                return;
+            }
+
+            // Use the completion callback to remove the need for a sync point.
+            connection.send(envelope, new ProviderSynchronization() {
+
+                @Override
+                public void onPendingSuccess() {
+                    hasWork = true;
+                }
+
+                @Override
+                public void onPendingFailure(Throwable cause) {
+                    hasWork = true;
+                }
+            });
+        } finally {
+            lock.readLock().unlock();
         }
     }
 
     @Override
     public void acknowledge(JmsConnection connection, 
JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
-        // Consumed or delivered messages fall into a transaction so we must 
check
-        // that there is an active one and start one if not.
+        // Consumed or delivered messages fall into a transaction otherwise 
just pass it in.
         if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
-            begin();
+            lock.readLock().lock();
+            try {
+                connection.acknowledge(envelope, ackType, new 
ProviderSynchronization() {
+
+                    @Override
+                    public void onPendingSuccess() {
+                        hasWork = true;
+                    }
+
+                    @Override
+                    public void onPendingFailure(Throwable cause) {
+                        hasWork = true;
+                    }
+                });
+            } finally {
+                lock.readLock().unlock();
+            }
+        } else {
+            connection.acknowledge(envelope, ackType);
         }
-
-        connection.acknowledge(envelope, ackType);
     }
 
     @Override
     public void addSynchronization(JmsTxSynchronization sync) throws 
JMSException {
-        if (synchronizations == null) {
-            synchronizations = new ArrayList<JmsTxSynchronization>(10);
-        }
-
+        lock.readLock().lock();
         try {
             if (sync.validate(this)) {
                 synchronizations.add(sync);
             }
         } catch (Exception e) {
             throw JmsExceptionSupport.create(e);
+        } finally {
+            lock.readLock().unlock();
         }
     }
 
@@ -93,103 +130,187 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
     @Override
     public void begin() throws JMSException {
-        if (!isInTransaction()) {
-            synchronizations = null;
-            failed = false;
+        lock.writeLock().lock();
+        try {
+            if (!isInTransaction()) {
+                reset();
+                transactionId = connection.getNextTransactionId();
+                JmsTransactionInfo transaction = new 
JmsTransactionInfo(session.getSessionId(), transactionId);
+                connection.createResource(transaction);
 
-            transactionId = connection.getNextTransactionId();
-            JmsTransactionInfo transaction = new 
JmsTransactionInfo(session.getSessionId(), transactionId);
-            connection.createResource(transaction);
+                if (listener != null) {
+                    try {
+                        listener.onTransactionStarted();
+                    } catch (Throwable error) {
+                        LOG.trace("Local TX listener error ignored: {}", 
error);
+                    }
+                }
 
-            if (listener != null) {
-                listener.onTransactionStarted();
+                LOG.debug("Begin: {}", transactionId);
             }
-
-            LOG.debug("Begin: {}", transactionId);
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
     @Override
-    public void rollback() throws JMSException {
-
-        if (isFailed()) {
-            LOG.debug("Rollback of already failed TX: {} syncCount: {}", 
transactionId,
-                      (synchronizations != null ? synchronizations.size() : 
0));
-
-            failed = false;
-            transactionId = null;
-        }
-
-        if (isInTransaction()) {
-            LOG.debug("Rollback: {} syncCount: {}", transactionId,
-                      (synchronizations != null ? synchronizations.size() : 
0));
-
-            failed = false;
-            transactionId = null;
-            connection.rollback(session.getSessionId());
-
-            if (listener != null) {
-                listener.onTransactionRolledBack();
+    public void commit() throws JMSException {
+        lock.writeLock().lock();
+        try {
+            if (isFailed()) {
+                try {
+                    rollback();
+                } catch (Exception e) {
+                    LOG.trace("Error during rollback of failed TX: {}", e);
+                }
+                throw new TransactionRolledBackException("Transaction failed 
and has been rolled back.");
+            } else {
+                LOG.debug("Commit: {} syncCount: {}", transactionId,
+                          (synchronizations != null ? synchronizations.size() 
: 0));
+
+                JmsTransactionId oldTransactionId = this.transactionId;
+                try {
+                    connection.commit(session.getSessionId(), new 
ProviderSynchronization() {
+
+                        @Override
+                        public void onPendingSuccess() {
+                            reset();
+                        }
+
+                        @Override
+                        public void onPendingFailure(Throwable cause) {
+                        }
+                    });
+
+                    if (listener != null) {
+                        try {
+                            listener.onTransactionCommitted();
+                        } catch (Throwable error) {
+                            LOG.trace("Local TX listener error ignored: {}", 
error);
+                        }
+                    }
+                    afterCommit();
+                } catch (JMSException cause) {
+                    LOG.info("Commit failed for transaction: {}", 
oldTransactionId);
+                    if (listener != null) {
+                        try {
+                            listener.onTransactionRolledBack();
+                        } catch (Throwable error) {
+                            LOG.trace("Local TX listener error ignored: {}", 
error);
+                        }
+                    }
+                    afterRollback();
+                    throw cause;
+                } finally {
+                    LOG.debug("Commit starting new TX after commit 
completed.");
+                    begin();
+                }
             }
+        } finally {
+            lock.writeLock().unlock();
         }
-
-        afterRollback();
     }
 
     @Override
-    public void commit() throws JMSException {
-        if (isFailed()) {
-            failed = false;
-            transactionId = null;
-            try {
-                rollback();
-            } catch (Exception e) {
-            }
-            throw new TransactionRolledBackException("Transaction failed and 
has been rolled back.");
-        }
+    public void rollback() throws JMSException {
+        doRollback(true);
+    }
 
-        if (isInTransaction()) {
-            LOG.debug("Commit: {} syncCount: {}", transactionId,
+    private void doRollback(boolean startNewTx) throws JMSException {
+        lock.writeLock().lock();
+        try {
+            LOG.debug("Rollback: {} syncCount: {}", transactionId,
                       (synchronizations != null ? synchronizations.size() : 
0));
+            connection.rollback(session.getSessionId(), new 
ProviderSynchronization() {
 
-            JmsTransactionId oldTransactionId = this.transactionId;
-            transactionId = null;
-            try {
-                connection.commit(session.getSessionId());
-                if (listener != null) {
-                    listener.onTransactionCommitted();
+                @Override
+                public void onPendingSuccess() {
+                    reset();
                 }
-                afterCommit();
-            } catch (JMSException cause) {
-                LOG.info("Commit failed for transaction: {}", 
oldTransactionId);
-                if (listener != null) {
+
+                @Override
+                public void onPendingFailure(Throwable cause) {
+                }
+            });
+
+            if (listener != null) {
+                try {
                     listener.onTransactionRolledBack();
+                } catch (Throwable error) {
+                    LOG.trace("Local TX listener error ignored: {}", error);
                 }
+            }
+
+            try {
                 afterRollback();
-                throw cause;
+            } finally {
+                if (startNewTx) {
+                    LOG.debug("Rollback starting new TX after rollback 
completed.");
+                    begin();
+                }
             }
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
     @Override
+    public void shutdown() throws JMSException {
+        doRollback(false);
+    }
+
+    @Override
     public void onConnectionInterrupted() {
-        if (isInTransaction()) {
+        lock.writeLock().tryLock();
+        try {
             failed = true;
+        } finally {
+            if (lock.writeLock().isHeldByCurrentThread()) {
+                lock.writeLock().unlock();
+            }
         }
     }
 
     @Override
     public void onConnectionRecovery(Provider provider) throws Exception {
-        transactionId = connection.getNextTransactionId();
-        JmsTransactionInfo transaction = new 
JmsTransactionInfo(session.getSessionId(), transactionId);
-        ProviderFuture request = new ProviderFuture();
-        provider.create(transaction, request);
-        request.sync();
+        if (lock.writeLock().tryLock()) {
+            try {
+                if (isInTransaction()) {
+                    LOG.info("onConnectionRecovery starting new transaction to 
replace old one.");
+
+                    // On connection recover we open a new TX to replace the 
existing one.
+                    transactionId = connection.getNextTransactionId();
+                    JmsTransactionInfo transaction = new 
JmsTransactionInfo(session.getSessionId(), transactionId);
+                    ProviderFuture request = new ProviderFuture();
+                    provider.create(transaction, request);
+                    request.sync();
+
+                    LOG.info("onConnectionRecovery started new transaction to 
replace old one.");
+
+                    // It is ok to use the newly created TX from here if the 
TX never had any
+                    // work done within it.
+                    if (!hasWork) {
+                        failed = false;
+                    }
+                }
+            } finally {
+                if (lock.writeLock().isHeldByCurrentThread()) {
+                    lock.writeLock().unlock();
+                }
+            }
+        } else {
+            // Some transaction work was pending since we could not lock, mark 
the TX
+            // as failed so that future calls will fail.
+            if (transactionId != null) {
+                LOG.info("onConnectionRecovery TX work pending, marking TX as 
failed.");
+                failed = true;
+            }
+        }
     }
 
     @Override
     public String toString() {
-        return "JmsLocalTransactionContext{ transactionId=" + transactionId + 
" }";
+        return "JmsLocalTransactionContext{ transactionId=" + 
getTransactionId() + " }";
     }
 
     //------------- Getters and Setters 
--------------------------------------//
@@ -211,52 +332,74 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
     @Override
     public boolean isInTransaction() {
-        return transactionId != null;
+        lock.readLock().lock();
+        try {
+            return transactionId != null;
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     //------------- Implementation methods 
-----------------------------------//
 
+    /*
+     * Must be called with the write lock held to ensure the synchronizations 
list
+     * can be safely cleared.
+     */
+    private void reset() {
+        transactionId = null;
+        failed = false;
+        hasWork = false;
+    }
+
+    /*
+     * Must be called with the write lock held to ensure the synchronizations 
list
+     * can be safely cleared.
+     */
     private void afterRollback() throws JMSException {
-        if (synchronizations == null) {
+        if (synchronizations.isEmpty()) {
             return;
         }
 
         Throwable firstException = null;
-        int size = synchronizations.size();
-        for (int i = 0; i < size; i++) {
+        for (JmsTxSynchronization sync : synchronizations) {
             try {
-                synchronizations.get(i).afterRollback();
+                sync.afterRollback();
             } catch (Throwable thrown) {
-                LOG.debug("Exception from afterRollback on " + 
synchronizations.get(i), thrown);
+                LOG.debug("Exception from afterRollback on " + sync, thrown);
                 if (firstException == null) {
                     firstException = thrown;
                 }
             }
         }
-        synchronizations = null;
+        synchronizations.clear();
         if (firstException != null) {
             throw JmsExceptionSupport.create(firstException);
         }
     }
 
+    /*
+     * Must be called with the write lock held to ensure the synchronizations 
list
+     * can be safely cleared.
+     */
     private void afterCommit() throws JMSException {
-        if (synchronizations == null) {
+        if (synchronizations.isEmpty()) {
             return;
         }
 
         Throwable firstException = null;
-        int size = synchronizations.size();
-        for (int i = 0; i < size; i++) {
+        for (JmsTxSynchronization sync : synchronizations) {
             try {
-                synchronizations.get(i).afterCommit();
+                sync.afterCommit();
             } catch (Throwable thrown) {
-                LOG.debug("Exception from afterCommit on " + 
synchronizations.get(i), thrown);
+                LOG.debug("Exception from afterCommit on " + sync, thrown);
                 if (firstException == null) {
                     firstException = thrown;
                 }
             }
         }
-        synchronizations = null;
+
+        synchronizations.clear();
         if (firstException != null) {
             throw JmsExceptionSupport.create(firstException);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index 18bcc90..23b6168 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -64,6 +64,10 @@ public class JmsNoTxTransactionContext implements 
JmsTransactionContext {
     }
 
     @Override
+    public void shutdown() throws JMSException {
+    }
+
+    @Override
     public void commit() throws JMSException {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index c0374c0..eee84b8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -118,11 +118,14 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
             setTransactionContext(new JmsNoTxTransactionContext());
         }
 
-        this.sessionInfo = new JmsSessionInfo(sessionId);
-        this.sessionInfo.setAcknowledgementMode(acknowledgementMode);
-        this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync());
+        sessionInfo = new JmsSessionInfo(sessionId);
+        sessionInfo.setAcknowledgementMode(acknowledgementMode);
+        sessionInfo.setSendAcksAsync(connection.isSendAcksAsync());
 
         connection.createResource(sessionInfo);
+
+        // We always keep an open TX so start now.
+        getTransactionContext().begin();
     }
 
     int acknowledgementMode() {
@@ -263,6 +266,8 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
             for (JmsMessageProducer producer : new 
ArrayList<JmsMessageProducer>(this.producers.values())) {
                 producer.shutdown(cause);
             }
+
+            transactionContext.shutdown();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index be487e1..008bb9c 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -87,8 +87,7 @@ public interface JmsTransactionContext {
     /**
      * Rolls back any work done in this transaction and releases any locks
      * currently held.  If the current transaction is in a failed state this
-     * resets that state and prepares the context for a new transaction to be
-     * stated via a call to <code>begin</code>.
+     * resets that state and initiates a new transaction via a begin call.
      *
      * @throws JMSException
      *         if the JMS provider fails to roll back the transaction due to 
some internal error.
@@ -99,7 +98,7 @@ public interface JmsTransactionContext {
      * Commits all work done in this transaction and releases any locks
      * currently held.  If the transaction is in a failed state this method
      * throws an exception to indicate that the transaction has failed and
-     * will be rolled back.
+     * will be rolled back a new transaction is started via a begin call.
      *
      * @throws JMSException
      *         if the commit fails to roll back the transaction due to some 
internal error.
@@ -107,6 +106,16 @@ public interface JmsTransactionContext {
     void commit() throws JMSException;
 
     /**
+     * Rolls back any work done in this transaction and releases any locks
+     * currently held.  This method will not start a new transaction and no new
+     * transacted work should be done using this transaction.
+     *
+     * @throws JMSException
+     *         if the JMS provider fails to roll back the transaction due to 
some internal error.
+     */
+    void shutdown() throws JMSException;
+
+    /**
      * @return the transaction ID of the currently active TX or null if none 
active.
      */
     JmsTransactionId getTransactionId();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
index f6e75bb..ae2013b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
@@ -30,8 +30,17 @@ public class ProviderFuture implements AsyncResult {
 
     private final AtomicBoolean completer = new AtomicBoolean();
     private final CountDownLatch latch = new CountDownLatch(1);
+    private final ProviderSynchronization synchronization;
     private volatile Throwable error;
 
+    public ProviderFuture() {
+        this(null);
+    }
+
+    public ProviderFuture(ProviderSynchronization synchronization) {
+        this.synchronization = synchronization;
+    }
+
     @Override
     public boolean isComplete() {
         return latch.getCount() == 0;
@@ -39,15 +48,21 @@ public class ProviderFuture implements AsyncResult {
 
     @Override
     public void onFailure(Throwable result) {
-        if(completer.compareAndSet(false, true)) {
+        if (completer.compareAndSet(false, true)) {
             error = result;
+            if (synchronization != null) {
+                synchronization.onPendingFailure(error);
+            }
             latch.countDown();
         }
     }
 
     @Override
     public void onSuccess() {
-        if(completer.compareAndSet(false, true)) {
+        if (completer.compareAndSet(false, true)) {
+            if (synchronization != null) {
+                synchronization.onPendingSuccess();
+            }
             latch.countDown();
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java
new file mode 100644
index 0000000..c35e697
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+/**
+ * Synchronization callback interface used to execute state updates
+ * or similar tasks in the thread context where the associated
+ * ProviderFuture is managed.
+ */
+public interface ProviderSynchronization {
+
+    void onPendingSuccess();
+
+    void onPendingFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 6d73058..f1676df 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -42,6 +42,7 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.DefaultProviderListener;
 import org.apache.qpid.jms.provider.Provider;
@@ -241,6 +242,15 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                 }
 
                 @Override
+                public boolean succeedsWhenOffline() {
+                    if (resource instanceof JmsTransactionInfo) {
+                        return true;
+                    } else {
+                        return false;
+                    }
+                }
+
+                @Override
                 public String toString() {
                     return "create -> " + resource;
                 }
@@ -412,7 +422,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
             }
 
             @Override
-            public boolean failureWhenOffline() {
+            public boolean succeedsWhenOffline() {
                 return true;
             }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 5acb1bf..f493a1c 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -61,8 +61,13 @@ import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.jms.util.MetaDataSupport;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.transaction.TxnCapability;
@@ -112,6 +117,13 @@ public class ConnectionIntegrationTest extends 
QpidJmsTestCase {
             
txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 
3, (byte) 4});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
             assertNotNull("Session should not be null", session);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index 941ab2a..f2bf986 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -265,6 +265,13 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 
3, (byte) 4});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
@@ -281,13 +288,6 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
             // Expect a non-draining flow to reopen the credit window again 
afterwards
             testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
 
-            // Expect an unsettled 'declare' transfer to the txn coordinator, 
and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 
3, (byte) 4});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
             QueueBrowser browser = session.createBrowser(queue);
             Enumeration<?> queueView = browser.getEnumeration();
             assertNotNull(queueView);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 496117c..4daec4b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -970,12 +970,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
-
             // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 
3, (byte) 4});
@@ -983,6 +977,12 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
             testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
 
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
+
             for (int i = 1; i <= consumeCount; i++) {
                 // Then expect an *settled* TransactionalState disposition for 
each message once received by the consumer
                 TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
@@ -1010,6 +1010,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
             testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
 
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, 
(byte) 4});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             session.commit();
 
             testPeer.waitForAllHandlersToComplete(1000);
@@ -1073,13 +1080,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            // Create a producer to use in provoking creation of the AMQP 
transaction
-            testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
-
             // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
             // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
@@ -1087,6 +1087,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
             testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
 
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
             // Expect the message which provoked creating the transaction. 
Check it carries
             // TransactionalState with the above txnId but has no outcome. 
Respond with a
             // TransactionalState with Accepted outcome.
@@ -1129,12 +1136,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
-
             // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
@@ -1142,6 +1143,12 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
             testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
 
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
+
             for (int i = 1; i <= consumeCount; i++) {
                 // Then expect a *settled* TransactionalState disposition for 
each message once received by the consumer
                 TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
@@ -1172,6 +1179,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
             testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
 
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             // Expect the messages that were not consumed to be released
             int unconsumed = transferCount - consumeCount;
             for (int i = 1; i <= unconsumed; i++) {
@@ -1199,6 +1213,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
@@ -1212,13 +1233,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             testPeer.expectSenderAttach();
             MessageProducer producer  = session.createProducer(queue);
 
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
             // Expect the message which provoked creating the transaction
             TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
@@ -1247,6 +1261,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
             testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
 
+            // Now expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             // Expect the messages that were not consumed to be released
             for (int i = 1; i <= messageCount; i++) {
                 testPeer.expectDisposition(true, new ReleasedMatcher());
@@ -1273,6 +1294,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue("myQueue");
 
@@ -1286,13 +1314,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             testPeer.expectSenderAttach();
             MessageProducer producer  = session.createProducer(queue);
 
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
             // Expect the message which provoked creating the transaction
             TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
@@ -1326,6 +1347,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
             testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
 
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             // Expect the messages that were not consumed to be released
             for (int i = 1; i <= messageCount; i++) {
                 testPeer.expectDisposition(true, new ReleasedMatcher());
@@ -1350,6 +1378,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
             String queueName = "myQueue";
             Queue queue = session.createQueue(queueName);
@@ -1591,6 +1626,13 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
             testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
 
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
 
             // Create a consumer, don't expect any flow as the connection is 
stopped
@@ -1605,13 +1647,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(3000);
 
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
             for (int i = 1; i <= messageCount; i++) {
                 // Then expect an *settled* TransactionalState disposition for 
each message once received by the consumer
                 TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java
index 1124014..8e79286 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java
@@ -189,7 +189,7 @@ public class FailoverProviderOfflineBehaviorTest extends 
FailoverProviderTestSup
         connection.close();
     }
 
-    @Test(timeout=10000)
+    @Test // (timeout=10000)
     public void testTransactionCommitFails() throws Exception {
         connection = (JmsConnection) factory.createConnection();
         connection.addConnectionListener(new ConnectionInterruptionListener());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
index 81e8948..c9224af 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
@@ -128,6 +128,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
         assertEquals(MSG_COUNT, proxy.getQueueSize());
 
         try {
+            LOG.info("Session commit firing after connection failed.");
             session.commit();
             fail("Session commit should have failed with TX rolled back.");
         } catch (TransactionRolledBackException rb) {
@@ -168,8 +169,9 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
         assertEquals(MSG_COUNT, proxy.getQueueSize());
 
         try {
+            LOG.info("Transacted being rolled back after failover");
             session.rollback();
-            LOG.info("Transacted rollback after failover ok");
+            LOG.info("Transacted rollback after failover");
         } catch (JMSException ex) {
             fail("Session rollback should not have failed.");
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index 2657a23..461bedb 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -280,7 +280,8 @@ public class JmsTransactedConsumerTest extends 
AmqpTestSupport {
 
         // Create a new consumer
         consumer = session.createConsumer(queue);
-        message = (TextMessage) consumer.receive(3000);
+        message = (TextMessage) consumer.receive(5000);
+        assertNotNull(message);
         session.commit();
 
         assertEquals(0, proxy.getQueueSize());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to