Repository: activemq-artemis Updated Branches: refs/heads/master 8c310a2ce -> 7374d2f72
ARTEMIS-60 Validate AMQP sender applied TransactionState Update the AMQP test client to allow for better inspection of the delivery updates that happen during normal use. Use those modification to check that when the broker's sender accepts and settles a non-settled disposition it adds a proper TransactionState disposition with the correct outcome and txn-id in that state. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a0948928 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a0948928 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a0948928 Branch: refs/heads/master Commit: a0948928c3b34788e6f371fd49eb4d07273ba08b Parents: 8c310a2 Author: Timothy Bish <[email protected]> Authored: Mon Mar 20 16:46:18 2017 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Mar 20 16:46:18 2017 -0400 ---------------------------------------------------------------------- .../amqp/client/AmqpAbstractResource.java | 12 +++- .../transport/amqp/client/AmqpConnection.java | 3 +- .../transport/amqp/client/AmqpEventSink.java | 5 +- .../transport/amqp/client/AmqpReceiver.java | 4 +- .../transport/amqp/client/AmqpSender.java | 14 +--- .../amqp/client/AmqpTransactionCoordinator.java | 11 ++-- .../integration/amqp/AmqpTransactionTest.java | 67 ++++++++++++++++++++ 7 files changed, 94 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index 0ab4596..691c11f 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -242,7 +242,8 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe } @Override - public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException { + doDeliveryUpdate(delivery); } @Override @@ -305,7 +306,14 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe } protected void doDeliveryUpdate(Delivery delivery) { - + AmqpValidator validator = getStateInspector(); + if (validator != null) { + try { + validator.inspectDeliveryUpdate(delivery); + } catch (Throwable error) { + validator.markAsInvalid(error.getMessage()); + } + } } //----- Private implementation utility methods ---------------------------// http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index fa44c02..76717fd 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -43,6 +43,7 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; @@ -697,7 +698,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements break; case DELIVERY: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); - amqpEventSink.processDeliveryUpdates(this); + amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext()); break; default: break; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java index 1c511a5..5581328 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java @@ -18,6 +18,8 @@ package org.apache.activemq.transport.amqp.client; import java.io.IOException; +import org.apache.qpid.proton.engine.Delivery; + /** * Interface used by classes that want to process AMQP events sent from * the transport layer. @@ -53,9 +55,10 @@ public interface AmqpEventSink { * for the given endpoint. * * @param connection the AmqpConnection instance for easier access to fire events. + * @param delivery the Delivery that was updated. * @throws IOException if an error occurs while processing the update. */ - void processDeliveryUpdates(AmqpConnection connection) throws IOException; + void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException; /** * Called when the Proton Engine signals an Flow related event has been triggered http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index cd76501..414f933 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -794,7 +794,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { } @Override - public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException { Delivery incoming = null; do { incoming = getEndpoint().current(); @@ -823,7 +823,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { } } while (incoming != null); - super.processDeliveryUpdates(connection); + super.processDeliveryUpdates(connection, delivery); } private void processDelivery(Delivery incoming) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 350a201..0a41ce6 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.transport.amqp.client; -import javax.jms.InvalidDestinationException; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -26,6 +25,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.InvalidDestinationException; + import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender; @@ -419,7 +420,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { } @Override - public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException { List<Delivery> toRemove = new ArrayList<>(); for (Delivery delivery : pending) { @@ -485,13 +486,4 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { public String toString() { return getClass().getSimpleName() + "{ address = " + address + "}"; } - - @Override - protected void doDeliveryUpdate(Delivery delivery) { - try { - getStateInspector().inspectDeliveryUpdate(delivery); - } catch (Throwable error) { - getStateInspector().markAsInvalid(error.getMessage()); - } - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java index 2e1a3ab..bc1030e 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.transport.amqp.client; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.TransactionRolledBackException; import java.io.IOException; import java.nio.BufferOverflowException; import java.util.HashMap; @@ -27,6 +24,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.TransactionRolledBackException; + import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -67,7 +68,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> { } @Override - public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException { try { Iterator<Delivery> deliveries = pendingDeliveries.iterator(); while (deliveries.hasNext()) { @@ -112,7 +113,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> { deliveries.remove(); } - super.processDeliveryUpdates(connection); + super.processDeliveryUpdates(connection, delivery); } catch (Exception e) { throw IOExceptionSupport.create(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index d49d499..3a9d498 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.engine.Delivery; @@ -920,6 +921,72 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { sendConnection.close(); consumerConnection.close(); } + } + + @Test(timeout = 30000) + public void testUnsettledTXMessageGetTransactedDispostion() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.setStateInspector(new AmqpValidator() { + @Override + public void inspectDeliveryUpdate(Delivery delivery) { + if (delivery.remotelySettled()) { + LOG.info("Receiver got delivery update for: {}", delivery); + if (!(delivery.getRemoteState() instanceof TransactionalState)) { + markAsInvalid("Transactionally acquire work no tagged as being in a transaction."); + } else { + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + if (!(txState.getOutcome() instanceof Accepted)) { + markAsInvalid("Transaction state lacks any outcome"); + } else if (txState.getTxnId() == null) { + markAsInvalid("Transaction state lacks any TX Id"); + } + } + + if (!(delivery.getLocalState() instanceof TransactionalState)) { + markAsInvalid("Transactionally acquire work no tagged as being in a transaction."); + } else { + TransactionalState txState = (TransactionalState) delivery.getLocalState(); + if (!(txState.getOutcome() instanceof Accepted)) { + markAsInvalid("Transaction state lacks any outcome"); + } else if (txState.getTxnId() == null) { + markAsInvalid("Transaction state lacks any TX Id"); + } + } + + TransactionalState localTxState = (TransactionalState) delivery.getLocalState(); + TransactionalState remoteTxState = (TransactionalState) delivery.getRemoteState(); + + if (!localTxState.getTxnId().equals(remoteTxState)) { + markAsInvalid("Message not enrolled in expected transaction"); + } + } + } + }); + + session.begin(); + + assertTrue(session.isInTransaction()); + + receiver.flow(1); + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(false); + + session.commit(); + + sender.getStateInspector().assertValid(); + + connection.close(); } }
