NO-JIRA: Adding an extra test on AmqpTransactionTest The test I'm adding was back ported from Artemis. It will validate if the ACKs are nacked in case of a connection.close(); To avoid a situation where the TX would sit on a Transaction Resource Manager somewhere like an XID.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/195046c5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/195046c5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/195046c5 Branch: refs/heads/activemq-5.14.x Commit: 195046c50359667fc6d9480cf13c3d9e1d123d66 Parents: 8e6fe41 Author: Clebert Suconic <[email protected]> Authored: Wed Sep 21 16:12:52 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Sep 27 12:16:16 2016 -0400 ---------------------------------------------------------------------- .../amqp/interop/AmqpTransactionTest.java | 45 ++++++++++++++++++++ 1 file changed, 45 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/195046c5/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index 994a2e7..0815f8a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -151,6 +151,51 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } @Test(timeout = 60000) + public void testReceiveAfterConnectionClose() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final QueueViewMBean queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getQueueSize()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + // this will force a rollback on the TX (It should at least) + connection.close(); + + connection = client.connect(); + session = connection.createSession(); + receiver = session.createReceiver(getTestName()); + session.begin(); + receiver.flow(1); + + received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + session.commit(); + + assertEquals(0, queue.getQueueSize()); + + connection.close(); + } + + + @Test(timeout = 60000) public void testReceiveMessageWithRollback() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = client.connect();
