Repository: qpid-jms Updated Branches: refs/heads/master 06bb71677 -> f5b8f9fc2
QPIDJMS-125 Some fixes for failing tests, takes into account that we no longer wait before starting a new TX but instead always keep one open. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f5b8f9fc Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f5b8f9fc Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f5b8f9fc Branch: refs/heads/master Commit: f5b8f9fc2ab5c4365aa12af4eaeff2451abe6f02 Parents: 06bb716 Author: Timothy Bish <[email protected]> Authored: Tue Oct 13 15:22:38 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Oct 13 15:23:04 2015 -0400 ---------------------------------------------------------------------- .../qpid/jms/JmsLocalTransactionContext.java | 91 +++++++++----------- 1 file changed, 43 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f5b8f9fc/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java index 6b252ef..daee929 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -45,7 +45,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { private final List<JmsTxSynchronization> synchronizations = new ArrayList<JmsTxSynchronization>(); private final JmsSession session; private final JmsConnection connection; - private JmsTransactionId transactionId; + private volatile JmsTransactionId transactionId; private volatile boolean failed; private volatile boolean hasWork; private JmsTransactionListener listener; @@ -70,11 +70,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void onPendingSuccess() { + LOG.trace("TX:{} has performed a send.", getTransactionId()); hasWork = true; } @Override public void onPendingFailure(Throwable cause) { + LOG.trace("TX:{} has a failed send.", getTransactionId()); hasWork = true; } }); @@ -93,11 +95,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void onPendingSuccess() { + LOG.trace("TX:{} has performed a acknowledge.", getTransactionId()); hasWork = true; } @Override public void onPendingFailure(Throwable cause) { + LOG.trace("TX:{} has failed a acknowledge.", getTransactionId()); hasWork = true; } }); @@ -221,27 +225,27 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { try { LOG.debug("Rollback: {} syncCount: {}", transactionId, (synchronizations != null ? synchronizations.size() : 0)); - connection.rollback(session.getSessionId(), new ProviderSynchronization() { + try { + connection.rollback(session.getSessionId(), new ProviderSynchronization() { - @Override - public void onPendingSuccess() { - reset(); - } + @Override + public void onPendingSuccess() { + reset(); + } - @Override - public void onPendingFailure(Throwable cause) { - } - }); + @Override + public void onPendingFailure(Throwable cause) { + } + }); - if (listener != null) { - try { - listener.onTransactionRolledBack(); - } catch (Throwable error) { - LOG.trace("Local TX listener error ignored: {}", error); + if (listener != null) { + try { + listener.onTransactionRolledBack(); + } catch (Throwable error) { + LOG.trace("Local TX listener error ignored: {}", error); + } } - } - try { afterRollback(); } finally { if (startNewTx) { @@ -273,37 +277,28 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void onConnectionRecovery(Provider provider) throws Exception { - if (lock.writeLock().tryLock()) { - try { - if (isInTransaction()) { - LOG.info("onConnectionRecovery starting new transaction to replace old one."); - - // On connection recover we open a new TX to replace the existing one. - transactionId = connection.getNextTransactionId(); - JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); - ProviderFuture request = new ProviderFuture(); - provider.create(transaction, request); - request.sync(); - - LOG.info("onConnectionRecovery started new transaction to replace old one."); - - // It is ok to use the newly created TX from here if the TX never had any - // work done within it. - if (!hasWork) { - failed = false; - } - } - } finally { - if (lock.writeLock().isHeldByCurrentThread()) { - lock.writeLock().unlock(); - } - } - } else { - // Some transaction work was pending since we could not lock, mark the TX - // as failed so that future calls will fail. - if (transactionId != null) { - LOG.info("onConnectionRecovery TX work pending, marking TX as failed."); - failed = true; + // Attempt to hold the write lock so that new work gets blocked while we take + // care of the recovery, if we can't get it not critical but this prevents + // work from getting queued on the provider needlessly. + lock.writeLock().tryLock(); + try { + LOG.info("onConnectionRecovery starting new transaction to replace old one."); + + // On connection recover we open a new TX to replace the existing one. + transactionId = connection.getNextTransactionId(); + JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); + ProviderFuture request = new ProviderFuture(); + provider.create(transaction, request); + request.sync(); + + LOG.info("onConnectionRecovery started new transaction to replace old one."); + + // It is ok to use the newly created TX from here if the TX never had any + // work done within it otherwise we want the next commit to fail. + failed = hasWork; + } finally { + if (lock.writeLock().isHeldByCurrentThread()) { + lock.writeLock().unlock(); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
