Repository: qpid-jms Updated Branches: refs/heads/master bdf91953d -> 69de1a540
QPIDJMS-125 Initial pass at refactoring local TX handling to add more thread safety and better failover handling. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/69de1a54 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/69de1a54 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/69de1a54 Branch: refs/heads/master Commit: 69de1a54067de4f181f3cdb305afa37de8da1f23 Parents: bdf9195 Author: Timothy Bish <[email protected]> Authored: Mon Oct 12 18:24:10 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Oct 12 18:24:10 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 73 ++++- .../qpid/jms/JmsLocalTransactionContext.java | 321 ++++++++++++++----- .../qpid/jms/JmsNoTxTransactionContext.java | 4 + .../java/org/apache/qpid/jms/JmsSession.java | 11 +- .../apache/qpid/jms/JmsTransactionContext.java | 15 +- .../qpid/jms/provider/ProviderFuture.java | 19 +- .../jms/provider/ProviderSynchronization.java | 30 ++ .../jms/provider/failover/FailoverProvider.java | 12 +- .../integration/ConnectionIntegrationTest.java | 12 + .../QueueBrowserIntegrationTest.java | 14 +- .../jms/integration/SessionIntegrationTest.java | 115 ++++--- .../FailoverProviderOfflineBehaviorTest.java | 2 +- .../jms/failover/JmsTxConsumerFailoverTest.java | 4 +- .../transactions/JmsTransactedConsumerTest.java | 3 +- 14 files changed, 475 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 88227c9..1686625 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -72,6 +72,7 @@ import org.apache.qpid.jms.provider.ProviderClosedException; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.ProviderSynchronization; import org.apache.qpid.jms.util.IdGenerator; import org.apache.qpid.jms.util.ThreadPoolUtils; import org.slf4j.Logger; @@ -529,10 +530,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti //----- Provider interface methods ---------------------------------------// void createResource(JmsResource resource) throws JMSException { + createResource(resource, null); + } + + void createResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.create(resource, request); @@ -546,10 +551,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void startResource(JmsResource resource) throws JMSException { + startResource(resource, null); + } + + void startResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.start(resource, request); @@ -563,10 +572,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void stopResource(JmsResource resource) throws JMSException { + stopResource(resource, null); + } + + void stopResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.stop(resource, request); @@ -580,10 +593,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void destroyResource(JmsResource resource) throws JMSException { + destroyResource(resource, null); + } + + void destroyResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.destroy(resource, request); @@ -597,6 +614,10 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void send(JmsOutboundMessageDispatch envelope) throws JMSException { + send(envelope, null); + } + + void send(JmsOutboundMessageDispatch envelope, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); // TODO - We don't currently have a way to say that an operation @@ -610,7 +631,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti // we can manage order of callback events to async senders at // this level. try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.send(envelope, request); @@ -624,10 +645,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { + acknowledge(envelope, ackType, null); + } + + void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); provider.acknowledge(envelope, ackType, request); request.sync(); } catch (Exception ioe) { @@ -636,10 +661,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void acknowledge(JmsSessionId sessionId) throws JMSException { + acknowledge(sessionId, null); + } + + void acknowledge(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); provider.acknowledge(sessionId, request); request.sync(); } catch (Exception ioe) { @@ -648,10 +677,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void unsubscribe(String name) throws JMSException { + unsubscribe(name, null); + } + + void unsubscribe(String name, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.unsubscribe(name, request); @@ -665,10 +698,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void commit(JmsSessionId sessionId) throws JMSException { + commit(sessionId, null); + } + + void commit(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.commit(sessionId, request); @@ -682,10 +719,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void rollback(JmsSessionId sessionId) throws JMSException { + rollback(sessionId, null); + } + + void rollback(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.rollback(sessionId, request); @@ -699,10 +740,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void recover(JmsSessionId sessionId) throws JMSException { + recover(sessionId, null); + } + + void recover(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.recover(sessionId, request); @@ -716,10 +761,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } void pull(JmsConsumerId consumerId, long timeout) throws JMSException { + pull(consumerId, timeout, null); + } + + void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { provider.pull(consumerId, timeout, request); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java index 2760efd..6b252ef 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.JMSException; import javax.jms.TransactionRolledBackException; @@ -30,6 +31,7 @@ import org.apache.qpid.jms.meta.JmsTransactionInfo; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderSynchronization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +42,16 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class); - private List<JmsTxSynchronization> synchronizations; + private final List<JmsTxSynchronization> synchronizations = new ArrayList<JmsTxSynchronization>(); private final JmsSession session; private final JmsConnection connection; private JmsTransactionId transactionId; - private boolean failed; + private volatile boolean failed; + private volatile boolean hasWork; private JmsTransactionListener listener; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + public JmsLocalTransactionContext(JmsSession session) { this.session = session; this.connection = session.getConnection(); @@ -54,35 +59,67 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException { - if (!isFailed()) { - begin(); - connection.send(envelope); + lock.readLock().lock(); + try { + if (isFailed()) { + return; + } + + // Use the completion callback to remove the need for a sync point. + connection.send(envelope, new ProviderSynchronization() { + + @Override + public void onPendingSuccess() { + hasWork = true; + } + + @Override + public void onPendingFailure(Throwable cause) { + hasWork = true; + } + }); + } finally { + lock.readLock().unlock(); } } @Override public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { - // Consumed or delivered messages fall into a transaction so we must check - // that there is an active one and start one if not. + // Consumed or delivered messages fall into a transaction otherwise just pass it in. if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) { - begin(); + lock.readLock().lock(); + try { + connection.acknowledge(envelope, ackType, new ProviderSynchronization() { + + @Override + public void onPendingSuccess() { + hasWork = true; + } + + @Override + public void onPendingFailure(Throwable cause) { + hasWork = true; + } + }); + } finally { + lock.readLock().unlock(); + } + } else { + connection.acknowledge(envelope, ackType); } - - connection.acknowledge(envelope, ackType); } @Override public void addSynchronization(JmsTxSynchronization sync) throws JMSException { - if (synchronizations == null) { - synchronizations = new ArrayList<JmsTxSynchronization>(10); - } - + lock.readLock().lock(); try { if (sync.validate(this)) { synchronizations.add(sync); } } catch (Exception e) { throw JmsExceptionSupport.create(e); + } finally { + lock.readLock().unlock(); } } @@ -93,103 +130,187 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void begin() throws JMSException { - if (!isInTransaction()) { - synchronizations = null; - failed = false; + lock.writeLock().lock(); + try { + if (!isInTransaction()) { + reset(); + transactionId = connection.getNextTransactionId(); + JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); + connection.createResource(transaction); - transactionId = connection.getNextTransactionId(); - JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); - connection.createResource(transaction); + if (listener != null) { + try { + listener.onTransactionStarted(); + } catch (Throwable error) { + LOG.trace("Local TX listener error ignored: {}", error); + } + } - if (listener != null) { - listener.onTransactionStarted(); + LOG.debug("Begin: {}", transactionId); } - - LOG.debug("Begin: {}", transactionId); + } finally { + lock.writeLock().unlock(); } } @Override - public void rollback() throws JMSException { - - if (isFailed()) { - LOG.debug("Rollback of already failed TX: {} syncCount: {}", transactionId, - (synchronizations != null ? synchronizations.size() : 0)); - - failed = false; - transactionId = null; - } - - if (isInTransaction()) { - LOG.debug("Rollback: {} syncCount: {}", transactionId, - (synchronizations != null ? synchronizations.size() : 0)); - - failed = false; - transactionId = null; - connection.rollback(session.getSessionId()); - - if (listener != null) { - listener.onTransactionRolledBack(); + public void commit() throws JMSException { + lock.writeLock().lock(); + try { + if (isFailed()) { + try { + rollback(); + } catch (Exception e) { + LOG.trace("Error during rollback of failed TX: {}", e); + } + throw new TransactionRolledBackException("Transaction failed and has been rolled back."); + } else { + LOG.debug("Commit: {} syncCount: {}", transactionId, + (synchronizations != null ? synchronizations.size() : 0)); + + JmsTransactionId oldTransactionId = this.transactionId; + try { + connection.commit(session.getSessionId(), new ProviderSynchronization() { + + @Override + public void onPendingSuccess() { + reset(); + } + + @Override + public void onPendingFailure(Throwable cause) { + } + }); + + if (listener != null) { + try { + listener.onTransactionCommitted(); + } catch (Throwable error) { + LOG.trace("Local TX listener error ignored: {}", error); + } + } + afterCommit(); + } catch (JMSException cause) { + LOG.info("Commit failed for transaction: {}", oldTransactionId); + if (listener != null) { + try { + listener.onTransactionRolledBack(); + } catch (Throwable error) { + LOG.trace("Local TX listener error ignored: {}", error); + } + } + afterRollback(); + throw cause; + } finally { + LOG.debug("Commit starting new TX after commit completed."); + begin(); + } } + } finally { + lock.writeLock().unlock(); } - - afterRollback(); } @Override - public void commit() throws JMSException { - if (isFailed()) { - failed = false; - transactionId = null; - try { - rollback(); - } catch (Exception e) { - } - throw new TransactionRolledBackException("Transaction failed and has been rolled back."); - } + public void rollback() throws JMSException { + doRollback(true); + } - if (isInTransaction()) { - LOG.debug("Commit: {} syncCount: {}", transactionId, + private void doRollback(boolean startNewTx) throws JMSException { + lock.writeLock().lock(); + try { + LOG.debug("Rollback: {} syncCount: {}", transactionId, (synchronizations != null ? synchronizations.size() : 0)); + connection.rollback(session.getSessionId(), new ProviderSynchronization() { - JmsTransactionId oldTransactionId = this.transactionId; - transactionId = null; - try { - connection.commit(session.getSessionId()); - if (listener != null) { - listener.onTransactionCommitted(); + @Override + public void onPendingSuccess() { + reset(); } - afterCommit(); - } catch (JMSException cause) { - LOG.info("Commit failed for transaction: {}", oldTransactionId); - if (listener != null) { + + @Override + public void onPendingFailure(Throwable cause) { + } + }); + + if (listener != null) { + try { listener.onTransactionRolledBack(); + } catch (Throwable error) { + LOG.trace("Local TX listener error ignored: {}", error); } + } + + try { afterRollback(); - throw cause; + } finally { + if (startNewTx) { + LOG.debug("Rollback starting new TX after rollback completed."); + begin(); + } } + } finally { + lock.writeLock().unlock(); } } @Override + public void shutdown() throws JMSException { + doRollback(false); + } + + @Override public void onConnectionInterrupted() { - if (isInTransaction()) { + lock.writeLock().tryLock(); + try { failed = true; + } finally { + if (lock.writeLock().isHeldByCurrentThread()) { + lock.writeLock().unlock(); + } } } @Override public void onConnectionRecovery(Provider provider) throws Exception { - transactionId = connection.getNextTransactionId(); - JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); - ProviderFuture request = new ProviderFuture(); - provider.create(transaction, request); - request.sync(); + if (lock.writeLock().tryLock()) { + try { + if (isInTransaction()) { + LOG.info("onConnectionRecovery starting new transaction to replace old one."); + + // On connection recover we open a new TX to replace the existing one. + transactionId = connection.getNextTransactionId(); + JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); + ProviderFuture request = new ProviderFuture(); + provider.create(transaction, request); + request.sync(); + + LOG.info("onConnectionRecovery started new transaction to replace old one."); + + // It is ok to use the newly created TX from here if the TX never had any + // work done within it. + if (!hasWork) { + failed = false; + } + } + } finally { + if (lock.writeLock().isHeldByCurrentThread()) { + lock.writeLock().unlock(); + } + } + } else { + // Some transaction work was pending since we could not lock, mark the TX + // as failed so that future calls will fail. + if (transactionId != null) { + LOG.info("onConnectionRecovery TX work pending, marking TX as failed."); + failed = true; + } + } } @Override public String toString() { - return "JmsLocalTransactionContext{ transactionId=" + transactionId + " }"; + return "JmsLocalTransactionContext{ transactionId=" + getTransactionId() + " }"; } //------------- Getters and Setters --------------------------------------// @@ -211,52 +332,74 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public boolean isInTransaction() { - return transactionId != null; + lock.readLock().lock(); + try { + return transactionId != null; + } finally { + lock.readLock().unlock(); + } } //------------- Implementation methods -----------------------------------// + /* + * Must be called with the write lock held to ensure the synchronizations list + * can be safely cleared. + */ + private void reset() { + transactionId = null; + failed = false; + hasWork = false; + } + + /* + * Must be called with the write lock held to ensure the synchronizations list + * can be safely cleared. + */ private void afterRollback() throws JMSException { - if (synchronizations == null) { + if (synchronizations.isEmpty()) { return; } Throwable firstException = null; - int size = synchronizations.size(); - for (int i = 0; i < size; i++) { + for (JmsTxSynchronization sync : synchronizations) { try { - synchronizations.get(i).afterRollback(); + sync.afterRollback(); } catch (Throwable thrown) { - LOG.debug("Exception from afterRollback on " + synchronizations.get(i), thrown); + LOG.debug("Exception from afterRollback on " + sync, thrown); if (firstException == null) { firstException = thrown; } } } - synchronizations = null; + synchronizations.clear(); if (firstException != null) { throw JmsExceptionSupport.create(firstException); } } + /* + * Must be called with the write lock held to ensure the synchronizations list + * can be safely cleared. + */ private void afterCommit() throws JMSException { - if (synchronizations == null) { + if (synchronizations.isEmpty()) { return; } Throwable firstException = null; - int size = synchronizations.size(); - for (int i = 0; i < size; i++) { + for (JmsTxSynchronization sync : synchronizations) { try { - synchronizations.get(i).afterCommit(); + sync.afterCommit(); } catch (Throwable thrown) { - LOG.debug("Exception from afterCommit on " + synchronizations.get(i), thrown); + LOG.debug("Exception from afterCommit on " + sync, thrown); if (firstException == null) { firstException = thrown; } } } - synchronizations = null; + + synchronizations.clear(); if (firstException != null) { throw JmsExceptionSupport.create(firstException); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java index 18bcc90..23b6168 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java @@ -64,6 +64,10 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext { } @Override + public void shutdown() throws JMSException { + } + + @Override public void commit() throws JMSException { } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index c0374c0..eee84b8 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -118,11 +118,14 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa setTransactionContext(new JmsNoTxTransactionContext()); } - this.sessionInfo = new JmsSessionInfo(sessionId); - this.sessionInfo.setAcknowledgementMode(acknowledgementMode); - this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync()); + sessionInfo = new JmsSessionInfo(sessionId); + sessionInfo.setAcknowledgementMode(acknowledgementMode); + sessionInfo.setSendAcksAsync(connection.isSendAcksAsync()); connection.createResource(sessionInfo); + + // We always keep an open TX so start now. + getTransactionContext().begin(); } int acknowledgementMode() { @@ -263,6 +266,8 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa for (JmsMessageProducer producer : new ArrayList<JmsMessageProducer>(this.producers.values())) { producer.shutdown(cause); } + + transactionContext.shutdown(); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java index be487e1..008bb9c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java @@ -87,8 +87,7 @@ public interface JmsTransactionContext { /** * Rolls back any work done in this transaction and releases any locks * currently held. If the current transaction is in a failed state this - * resets that state and prepares the context for a new transaction to be - * stated via a call to <code>begin</code>. + * resets that state and initiates a new transaction via a begin call. * * @throws JMSException * if the JMS provider fails to roll back the transaction due to some internal error. @@ -99,7 +98,7 @@ public interface JmsTransactionContext { * Commits all work done in this transaction and releases any locks * currently held. If the transaction is in a failed state this method * throws an exception to indicate that the transaction has failed and - * will be rolled back. + * will be rolled back a new transaction is started via a begin call. * * @throws JMSException * if the commit fails to roll back the transaction due to some internal error. @@ -107,6 +106,16 @@ public interface JmsTransactionContext { void commit() throws JMSException; /** + * Rolls back any work done in this transaction and releases any locks + * currently held. This method will not start a new transaction and no new + * transacted work should be done using this transaction. + * + * @throws JMSException + * if the JMS provider fails to roll back the transaction due to some internal error. + */ + void shutdown() throws JMSException; + + /** * @return the transaction ID of the currently active TX or null if none active. */ JmsTransactionId getTransactionId(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java index f6e75bb..ae2013b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java @@ -30,8 +30,17 @@ public class ProviderFuture implements AsyncResult { private final AtomicBoolean completer = new AtomicBoolean(); private final CountDownLatch latch = new CountDownLatch(1); + private final ProviderSynchronization synchronization; private volatile Throwable error; + public ProviderFuture() { + this(null); + } + + public ProviderFuture(ProviderSynchronization synchronization) { + this.synchronization = synchronization; + } + @Override public boolean isComplete() { return latch.getCount() == 0; @@ -39,15 +48,21 @@ public class ProviderFuture implements AsyncResult { @Override public void onFailure(Throwable result) { - if(completer.compareAndSet(false, true)) { + if (completer.compareAndSet(false, true)) { error = result; + if (synchronization != null) { + synchronization.onPendingFailure(error); + } latch.countDown(); } } @Override public void onSuccess() { - if(completer.compareAndSet(false, true)) { + if (completer.compareAndSet(false, true)) { + if (synchronization != null) { + synchronization.onPendingSuccess(); + } latch.countDown(); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java new file mode 100644 index 0000000..c35e697 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderSynchronization.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +/** + * Synchronization callback interface used to execute state updates + * or similar tasks in the thread context where the associated + * ProviderFuture is managed. + */ +public interface ProviderSynchronization { + + void onPendingSuccess(); + + void onPendingFailure(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 6d73058..f1676df 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -42,6 +42,7 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsConsumerId; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.meta.JmsTransactionInfo; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.DefaultProviderListener; import org.apache.qpid.jms.provider.Provider; @@ -241,6 +242,15 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } @Override + public boolean succeedsWhenOffline() { + if (resource instanceof JmsTransactionInfo) { + return true; + } else { + return false; + } + } + + @Override public String toString() { return "create -> " + resource; } @@ -412,7 +422,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } @Override - public boolean failureWhenOffline() { + public boolean succeedsWhenOffline() { return true; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 5acb1bf..f493a1c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -61,8 +61,13 @@ import org.apache.qpid.jms.test.Wait; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; import org.apache.qpid.jms.util.MetaDataSupport; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.transaction.TxnCapability; @@ -112,6 +117,13 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); assertNotNull("Session should not be null", session); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java index 941ab2a..f2bf986 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java @@ -265,6 +265,13 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); @@ -281,13 +288,6 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { // Expect a non-draining flow to reopen the credit window again afterwards testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); - // Expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - QueueBrowser browser = session.createBrowser(queue); Enumeration<?> queueView = browser.getEnumeration(); assertNotNull(queueView); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 496117c..4daec4b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -970,12 +970,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - testPeer.expectReceiverAttach(); - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); - // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a declared disposition state containing the txnId. Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); @@ -983,6 +977,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase { declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); + for (int i = 1; i <= consumeCount; i++) { // Then expect an *settled* TransactionalState disposition for each message once received by the consumer TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); @@ -1010,6 +1010,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + session.commit(); testPeer.waitForAllHandlersToComplete(1000); @@ -1073,13 +1080,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - // Create a producer to use in provoking creation of the AMQP transaction - testPeer.expectSenderAttach(); - MessageProducer producer = session.createProducer(queue); - // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a Declared disposition state containing the txnId. Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); @@ -1087,6 +1087,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + // Expect the message which provoked creating the transaction. Check it carries // TransactionalState with the above txnId but has no outcome. Respond with a // TransactionalState with Accepted outcome. @@ -1129,12 +1136,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - testPeer.expectReceiverAttach(); - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); - // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a declared disposition state containing the txnId. Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); @@ -1142,6 +1143,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase { declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); + for (int i = 1; i <= consumeCount; i++) { // Then expect a *settled* TransactionalState disposition for each message once received by the consumer TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); @@ -1172,6 +1179,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + // Expect the messages that were not consumed to be released int unconsumed = transferCount - consumeCount; for (int i = 1; i <= unconsumed; i++) { @@ -1199,6 +1213,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); @@ -1212,13 +1233,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.expectSenderAttach(); MessageProducer producer = session.createProducer(queue); - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - // Expect the message which provoked creating the transaction TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); @@ -1247,6 +1261,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + // Now expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + // Expect the messages that were not consumed to be released for (int i = 1; i <= messageCount; i++) { testPeer.expectDisposition(true, new ReleasedMatcher()); @@ -1273,6 +1294,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); @@ -1286,13 +1314,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.expectSenderAttach(); MessageProducer producer = session.createProducer(queue); - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - // Expect the message which provoked creating the transaction TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); @@ -1326,6 +1347,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + // Expect the messages that were not consumed to be released for (int i = 1; i <= messageCount; i++) { testPeer.expectDisposition(true, new ReleasedMatcher()); @@ -1350,6 +1378,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); String queueName = "myQueue"; Queue queue = session.createQueue(queueName); @@ -1591,6 +1626,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase { CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); // Create a consumer, don't expect any flow as the connection is stopped @@ -1605,13 +1647,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(3000); - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - for (int i = 1; i <= messageCount; i++) { // Then expect an *settled* TransactionalState disposition for each message once received by the consumer TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java index 1124014..8e79286 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderOfflineBehaviorTest.java @@ -189,7 +189,7 @@ public class FailoverProviderOfflineBehaviorTest extends FailoverProviderTestSup connection.close(); } - @Test(timeout=10000) + @Test // (timeout=10000) public void testTransactionCommitFails() throws Exception { connection = (JmsConnection) factory.createConnection(); connection.addConnectionListener(new ConnectionInterruptionListener()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java index 81e8948..c9224af 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java @@ -128,6 +128,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { assertEquals(MSG_COUNT, proxy.getQueueSize()); try { + LOG.info("Session commit firing after connection failed."); session.commit(); fail("Session commit should have failed with TX rolled back."); } catch (TransactionRolledBackException rb) { @@ -168,8 +169,9 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { assertEquals(MSG_COUNT, proxy.getQueueSize()); try { + LOG.info("Transacted being rolled back after failover"); session.rollback(); - LOG.info("Transacted rollback after failover ok"); + LOG.info("Transacted rollback after failover"); } catch (JMSException ex) { fail("Session rollback should not have failed."); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/69de1a54/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index 2657a23..461bedb 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -280,7 +280,8 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { // Create a new consumer consumer = session.createConsumer(queue); - message = (TextMessage) consumer.receive(3000); + message = (TextMessage) consumer.receive(5000); + assertNotNull(message); session.commit(); assertEquals(0, proxy.getQueueSize()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
