Repository: qpid-jms
Updated Branches:
  refs/heads/master 06bb71677 -> f5b8f9fc2


QPIDJMS-125 Some fixes for failing tests, takes into account that we no
longer wait before starting a new TX but instead always keep one open.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f5b8f9fc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f5b8f9fc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f5b8f9fc

Branch: refs/heads/master
Commit: f5b8f9fc2ab5c4365aa12af4eaeff2451abe6f02
Parents: 06bb716
Author: Timothy Bish <[email protected]>
Authored: Tue Oct 13 15:22:38 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue Oct 13 15:23:04 2015 -0400

----------------------------------------------------------------------
 .../qpid/jms/JmsLocalTransactionContext.java    | 91 +++++++++-----------
 1 file changed, 43 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f5b8f9fc/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 6b252ef..daee929 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -45,7 +45,7 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
     private final List<JmsTxSynchronization> synchronizations = new 
ArrayList<JmsTxSynchronization>();
     private final JmsSession session;
     private final JmsConnection connection;
-    private JmsTransactionId transactionId;
+    private volatile JmsTransactionId transactionId;
     private volatile boolean failed;
     private volatile boolean hasWork;
     private JmsTransactionListener listener;
@@ -70,11 +70,13 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
                 @Override
                 public void onPendingSuccess() {
+                    LOG.trace("TX:{} has performed a send.", 
getTransactionId());
                     hasWork = true;
                 }
 
                 @Override
                 public void onPendingFailure(Throwable cause) {
+                    LOG.trace("TX:{} has a failed send.", getTransactionId());
                     hasWork = true;
                 }
             });
@@ -93,11 +95,13 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
                     @Override
                     public void onPendingSuccess() {
+                        LOG.trace("TX:{} has performed a acknowledge.", 
getTransactionId());
                         hasWork = true;
                     }
 
                     @Override
                     public void onPendingFailure(Throwable cause) {
+                        LOG.trace("TX:{} has failed a acknowledge.", 
getTransactionId());
                         hasWork = true;
                     }
                 });
@@ -221,27 +225,27 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
         try {
             LOG.debug("Rollback: {} syncCount: {}", transactionId,
                       (synchronizations != null ? synchronizations.size() : 
0));
-            connection.rollback(session.getSessionId(), new 
ProviderSynchronization() {
+            try {
+                connection.rollback(session.getSessionId(), new 
ProviderSynchronization() {
 
-                @Override
-                public void onPendingSuccess() {
-                    reset();
-                }
+                    @Override
+                    public void onPendingSuccess() {
+                        reset();
+                    }
 
-                @Override
-                public void onPendingFailure(Throwable cause) {
-                }
-            });
+                    @Override
+                    public void onPendingFailure(Throwable cause) {
+                    }
+                });
 
-            if (listener != null) {
-                try {
-                    listener.onTransactionRolledBack();
-                } catch (Throwable error) {
-                    LOG.trace("Local TX listener error ignored: {}", error);
+                if (listener != null) {
+                    try {
+                        listener.onTransactionRolledBack();
+                    } catch (Throwable error) {
+                        LOG.trace("Local TX listener error ignored: {}", 
error);
+                    }
                 }
-            }
 
-            try {
                 afterRollback();
             } finally {
                 if (startNewTx) {
@@ -273,37 +277,28 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
     @Override
     public void onConnectionRecovery(Provider provider) throws Exception {
-        if (lock.writeLock().tryLock()) {
-            try {
-                if (isInTransaction()) {
-                    LOG.info("onConnectionRecovery starting new transaction to 
replace old one.");
-
-                    // On connection recover we open a new TX to replace the 
existing one.
-                    transactionId = connection.getNextTransactionId();
-                    JmsTransactionInfo transaction = new 
JmsTransactionInfo(session.getSessionId(), transactionId);
-                    ProviderFuture request = new ProviderFuture();
-                    provider.create(transaction, request);
-                    request.sync();
-
-                    LOG.info("onConnectionRecovery started new transaction to 
replace old one.");
-
-                    // It is ok to use the newly created TX from here if the 
TX never had any
-                    // work done within it.
-                    if (!hasWork) {
-                        failed = false;
-                    }
-                }
-            } finally {
-                if (lock.writeLock().isHeldByCurrentThread()) {
-                    lock.writeLock().unlock();
-                }
-            }
-        } else {
-            // Some transaction work was pending since we could not lock, mark 
the TX
-            // as failed so that future calls will fail.
-            if (transactionId != null) {
-                LOG.info("onConnectionRecovery TX work pending, marking TX as 
failed.");
-                failed = true;
+        // Attempt to hold the write lock so that new work gets blocked while 
we take
+        // care of the recovery, if we can't get it not critical but this 
prevents
+        // work from getting queued on the provider needlessly.
+        lock.writeLock().tryLock();
+        try {
+            LOG.info("onConnectionRecovery starting new transaction to replace 
old one.");
+
+            // On connection recover we open a new TX to replace the existing 
one.
+            transactionId = connection.getNextTransactionId();
+            JmsTransactionInfo transaction = new 
JmsTransactionInfo(session.getSessionId(), transactionId);
+            ProviderFuture request = new ProviderFuture();
+            provider.create(transaction, request);
+            request.sync();
+
+            LOG.info("onConnectionRecovery started new transaction to replace 
old one.");
+
+            // It is ok to use the newly created TX from here if the TX never 
had any
+            // work done within it otherwise we want the next commit to fail.
+            failed = hasWork;
+        } finally {
+            if (lock.writeLock().isHeldByCurrentThread()) {
+                lock.writeLock().unlock();
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to