Repository: qpid-broker-j Updated Branches: refs/heads/master 403a725ab -> 72ed1aa52
QPID-7842: Add protocol test ensuring that the same delivery tag can be re-used after delivery is settled Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/72ed1aa5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/72ed1aa5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/72ed1aa5 Branch: refs/heads/master Commit: 72ed1aa525272e70c696459d0f13e67781970258 Parents: 403a725 Author: Alex Rudyy <[email protected]> Authored: Fri Jun 30 10:52:04 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Fri Jun 30 10:52:04 2017 +0100 ---------------------------------------------------------------------- .../protocol/v1_0/messaging/TransferTest.java | 105 +++++++++++++++++-- 1 file changed, 96 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/72ed1aa5/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index c63e3c3..a5f23e2 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -21,8 +21,10 @@ package org.apache.qpid.tests.protocol.v1_0.messaging; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -688,19 +690,23 @@ public class TransferTest extends ProtocolTestBase @Test @SpecificationTest(section = "2.7.5", description = "[delivery-tag] uniquely identifies the delivery attempt for a given message on this link.") - public void transfersWithDuplicateDeliveryTag() throws Exception + public void transfersWithDuplicateUnsettledDeliveryTag() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8)); Interaction interaction = transport.newInteraction(); - interaction.negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) + interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) .attachRole(Role.SENDER) .attachRcvSettleMode(ReceiverSettleMode.SECOND) - .attach().consumeResponse(Attach.class) + .attach() + .consumeResponse(Attach.class) .consumeResponse(Flow.class); Flow flow = interaction.getLatestResponse(Flow.class); @@ -710,11 +716,12 @@ public class TransferTest extends ProtocolTestBase .transferDeliveryTag(deliveryTag) .transferPayloadData("test") .transfer() - + .sync() .transferDeliveryTag(deliveryTag) .transferDeliveryId(UnsignedInteger.ONE) .transferPayloadData("test2") - .transfer(); + .transfer() + .sync(); do { @@ -722,7 +729,7 @@ public class TransferTest extends ProtocolTestBase Response<?> response = interaction.getLatestResponse(); assertThat(response, is(notNullValue())); - Object body = response.getBody(); + Object body = response.getBody(); if (body instanceof ErrorCarryingFrameBody) { Error error = ((ErrorCarryingFrameBody) body).getError(); @@ -731,7 +738,7 @@ public class TransferTest extends ProtocolTestBase } else if (body instanceof Disposition) { - Disposition disposition = (Disposition)body; + Disposition disposition = (Disposition) body; assertThat(disposition.getSettled(), is(equalTo(false))); assertThat(disposition.getFirst(), is(not(equalTo(UnsignedInteger.ONE)))); assertThat(disposition.getLast(), is(not(equalTo(UnsignedInteger.ONE)))); @@ -743,4 +750,84 @@ public class TransferTest extends ProtocolTestBase } while (true); } } + + @Test + @SpecificationTest(section = "2.6.12", + description = "The delivery-tag MUST be unique amongst all deliveries that" + + " could be considered unsettled by either end of the link.") + public void deliveryTagCanBeReusedAfterDeliveryIsSettled() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8)); + + Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attach() + .consumeResponse(Attach.class) + .consumeResponse(Flow.class); + + Flow flow = interaction.getLatestResponse(Flow.class); + assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1))); + + interaction.transferDeliveryId(UnsignedInteger.ZERO) + .transferDeliveryTag(deliveryTag) + .transferPayloadData("test") + .transfer() + .sync() + + .transferDeliveryTag(deliveryTag) + .transferDeliveryId(UnsignedInteger.ONE) + .transferPayloadData("test2") + .transfer() + .sync(); + + boolean firstSettled = false, secondSettled = false; + do + { + interaction.consumeResponse(); + Response<?> response = interaction.getLatestResponse(); + assertThat(response, is(notNullValue())); + + Object body = response.getBody(); + + if (body instanceof Disposition) + { + Disposition disposition = (Disposition) body; + assertThat(disposition.getSettled(), is(equalTo(true))); + assertThat(disposition.getFirst(), + anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE))); + assertThat(disposition.getLast(), + anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE), nullValue())); + + if (UnsignedInteger.ZERO.equals(disposition.getFirst())) + { + firstSettled = true; + } + if (UnsignedInteger.ONE.equals(disposition.getFirst()) + || UnsignedInteger.ONE.equals(disposition.getLast())) + { + secondSettled = true; + } + } + else if (!(body instanceof Flow)) + { + fail("Unexpected response " + body); + } + } + while (!firstSettled || !secondSettled); + + transport.doCloseConnection(); + + assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true)); + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2))); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
