Repository: activemq Updated Branches: refs/heads/master 74a5381b9 -> f71e0ee15
NO-JIRA Add some additional testing around outcomes Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f71e0ee1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f71e0ee1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f71e0ee1 Branch: refs/heads/master Commit: f71e0ee15b2c456dac28cf456deafb764221afa6 Parents: 74a5381 Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Oct 11 18:58:51 2016 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Oct 11 18:58:51 2016 -0400 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpMessage.java | 15 ++++- .../transport/amqp/client/AmqpReceiver.java | 37 +++++++++++ .../amqp/interop/AmqpReceiverTest.java | 68 ++++++++++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f71e0ee1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 952f98d..d28ac8e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -201,7 +201,7 @@ public class AmqpMessage { /** * Release the message, remote can redeliver it elsewhere. * - * @throws Exception if an error occurs during the reject. + * @throws Exception if an error occurs during the release. */ public void release() throws Exception { if (receiver == null) { @@ -211,6 +211,19 @@ public class AmqpMessage { receiver.release(delivery); } + /** + * Reject the message, remote can redeliver it elsewhere. + * + * @throws Exception if an error occurs during the reject. + */ + public void reject() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't release non-received message."); + } + + receiver.reject(delivery); + } + //----- Convenience methods for constructing outbound messages -----------// /** http://git-wip-us.apache.org/repos/asf/activemq/blob/f71e0ee1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 3543ae3..d2b859a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -605,6 +605,43 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { } /** + * Reject a message that was dispatched under the given Delivery instance. + * + * @param delivery + * the Delivery instance to reject. + * + * @throws IOException if an error occurs while sending the release. + */ + public void reject(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to release cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + delivery.disposition(new Rejected()); + delivery.settle(); + session.pumpToProtonTransport(request); + } + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** * @return an unmodifiable view of the underlying Receiver instance. */ public Receiver getReceiver() { http://git-wip-us.apache.org/repos/asf/activemq/blob/f71e0ee1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 0b9a379..2a06561 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -461,6 +461,74 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { } @Test(timeout = 30000) + public void testReleasedDisposition() throws Exception { + sendMessages(getTestName(), 1, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.release(); + + // Read the message again and validate its state + + message = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + + message.accept(); + + protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + connection.close(); + } + + @Test(timeout = 30000) + public void testRejectedDisposition() throws Exception { + sendMessages(getTestName(), 1, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.reject(); + + // Read the message again and validate its state + + message = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + + message.accept(); + + protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected updated value for AMQP delivery-count", 1, protonMessage.getDeliveryCount()); + + connection.close(); + } + + @Test(timeout = 30000) public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { doModifiedDispositionTestImpl(Boolean.TRUE, null); }