This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 9f9bfe92980c005eee825f0386307c951b3fafcc Author: Alex Rudyy <[email protected]> AuthorDate: Tue Aug 20 13:45:54 2019 +0100 QPID-8349: [Tests][AMQP 1.0] Improve transaction and transfer tests --- .../qpid/tests/protocol/v1_0/Interaction.java | 114 ++++++---- .../v1_0/InteractionTransactionalState.java | 12 ++ .../org/apache/qpid/tests/protocol/v1_0/Utils.java | 13 +- .../qpid/tests/protocol/v1_0/DecodeErrorTest.java | 123 ++++++++--- .../anonymousterminus/AnonymousTerminusTest.java | 81 +++---- .../extensions/qpid/queue/QueueDeletionTest.java | 4 +- .../protocol/v1_0/messaging/MessageFormat.java | 2 +- .../protocol/v1_0/messaging/MultiTransferTest.java | 9 +- .../protocol/v1_0/messaging/TransferTest.java | 238 +++++++++++---------- .../protocol/v1_0/transaction/DischargeTest.java | 205 +++++++++--------- .../transaction/TransactionalTransferTest.java | 94 ++++---- .../protocol/v1_0/transport/link/FlowTest.java | 221 ++++++++++++------- .../v1_0/transport/link/ResumeDeliveriesTest.java | 16 +- .../qpid/tests/protocol/AbstractInteraction.java | 7 +- 14 files changed, 660 insertions(+), 479 deletions(-) diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index 98ffe45..ab2c959 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -20,12 +20,6 @@ package org.apache.qpid.tests.protocol.v1_0; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; - import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -117,6 +111,7 @@ public class Interaction extends AbstractInteraction<Interaction> private Map<String, Object> _latestDeliveryApplicationProperties; private Map<Class, FrameBody> _latestResponses = new HashMap<>(); private AtomicLong _receivedDeliveryCount = new AtomicLong(); + private AtomicLong _coordinatorCredits = new AtomicLong(); Interaction(final FrameTransport frameTransport) { @@ -955,6 +950,12 @@ public class Interaction extends AbstractInteraction<Interaction> public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception { + return txnAttachCoordinatorLink(transactionalState, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL); + } + + public Interaction txnAttachCoordinatorLink(final InteractionTransactionalState transactionalState, + final Symbol... outcomes) throws Exception + { Attach attach = new Attach(); attach.setName("testTransactionCoordinator-" + transactionalState.getHandle()); attach.setHandle(transactionalState.getHandle()); @@ -963,70 +964,90 @@ public class Interaction extends AbstractInteraction<Interaction> attach.setRole(Role.SENDER); Source source = new Source(); attach.setSource(source); - source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL); + source.setOutcomes(outcomes); sendPerformativeAndChainFuture(attach, _sessionChannel); consumeResponse(Attach.class); - consumeResponse(Flow.class); + final Flow flow = consumeResponse(Flow.class).getLatestResponse(Flow.class); + _coordinatorCredits.set(flow.getLinkCredit().longValue()); return this; } public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception { - Transfer transfer = createTransactionTransfer(txnState.getHandle()); - transferPayload(transfer, new Declare()); - sendPerformativeAndChainFuture(transfer, _sessionChannel); - consumeResponse(Disposition.class); - Disposition declareTransactionDisposition = getLatestResponse(Disposition.class); - assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true))); - assertThat(declareTransactionDisposition.getState(), is(instanceOf(Declared.class))); - Binary transactionId = ((Declared) declareTransactionDisposition.getState()).getTxnId(); - assertThat(transactionId, is(notNullValue())); - consumeResponse(Flow.class); + sendPayloadToCoordinator(new Declare(), txnState.getHandle()); + final DeliveryState state = handleCoordinatorResponse(); + txnState.setDeliveryState(state); + final Binary transactionId = ((Declared) state).getTxnId(); txnState.setLastTransactionId(transactionId); return this; } - public Interaction discharge(final InteractionTransactionalState txnState, final boolean failed) throws Exception + public Interaction txnSendDischarge(final InteractionTransactionalState txnState, final boolean failed) + throws Exception { final Discharge discharge = new Discharge(); discharge.setTxnId(txnState.getCurrentTransactionId()); discharge.setFail(failed); - - Transfer transfer = createTransactionTransfer(txnState.getHandle()); - transferPayload(transfer, discharge); - sendPerformativeAndChainFuture(transfer, _sessionChannel); + sendPayloadToCoordinator(discharge, txnState.getHandle()); return this; } public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception { - discharge(txnState, failed); + txnSendDischarge(txnState, failed); + final DeliveryState state = handleCoordinatorResponse(); + txnState.setDeliveryState(state); + txnState.setLastTransactionId(null); + return this; + } + + private void sendPayloadToCoordinator(final Object payload, final UnsignedInteger handle) + throws Exception + { + final Transfer transfer = createTransactionTransfer(handle); + transferPayload(transfer, payload); + sendPerformativeAndChainFuture(transfer, _sessionChannel); + } + + private DeliveryState handleCoordinatorResponse() throws Exception + { + final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class)); - Disposition declareTransactionDisposition = null; - Flow coordinatorFlow = null; + if (_coordinatorCredits.decrementAndGet() == 0) + { + expected.add(Flow.class); + } + + final Map<Class<?>, ?> responses = consumeResponses(expected); + + final Disposition disposition = (Disposition) responses.get(Disposition.class); + if (expected.contains(Flow.class)) + { + Flow flow = (Flow) responses.get(Flow.class); + _coordinatorCredits.set(flow.getLinkCredit().longValue()); + } + if (!Boolean.TRUE.equals(disposition.getSettled())) + { + throw new IllegalStateException("Coordinator disposition is not settled"); + } + return disposition.getState(); + } + + private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes) + throws Exception + { + Map<Class<?>, Object> results = new HashMap<>(); do { - consumeResponse(Disposition.class, Flow.class); - Response<?> response = getLatestResponse(); - if (response.getBody() instanceof Disposition) + Response<?> response = consumeResponse(responseTypes).getLatestResponse(); + if (response != null && response.getBody() instanceof FrameBody) { - declareTransactionDisposition = (Disposition) response.getBody(); + Class<?> bodyClass = response.getBody().getClass(); + results.put(bodyClass, response.getBody()); } - if (response.getBody() instanceof Flow) - { - final Flow flowResponse = (Flow) response.getBody(); - if (flowResponse.getHandle().equals(txnState.getHandle())) - { - coordinatorFlow = flowResponse; - } - } - } while(declareTransactionDisposition == null || coordinatorFlow == null); - - assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true))); - assertThat(declareTransactionDisposition.getState(), is(instanceOf(Accepted.class))); - - txnState.setLastTransactionId(null); - return this; + } + while (!results.keySet().containsAll(responseTypes)); + return results; } private Transfer createTransactionTransfer(final UnsignedInteger handle) @@ -1034,7 +1055,8 @@ public class Interaction extends AbstractInteraction<Interaction> Transfer transfer = new Transfer(); transfer.setHandle(handle); transfer.setDeliveryId(getNextDeliveryId()); - transfer.setDeliveryTag(new Binary(("transaction-" + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8))); + transfer.setDeliveryTag(new Binary(("transaction-" + + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8))); return transfer; } diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java index 061be92..b0832cc 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java @@ -21,12 +21,14 @@ package org.apache.qpid.tests.protocol.v1_0; import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; public class InteractionTransactionalState { private final UnsignedInteger _handle; private Binary _lastTransactionId; + private DeliveryState _deliveryState; public InteractionTransactionalState(final UnsignedInteger handle) { @@ -47,4 +49,14 @@ public class InteractionTransactionalState { return _lastTransactionId; } + + public DeliveryState getDeliveryState() + { + return _deliveryState; + } + + public void setDeliveryState(final DeliveryState deliveryState) + { + _deliveryState = deliveryState; + } } diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java index e846b5f..2ac50f4 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java @@ -20,6 +20,7 @@ package org.apache.qpid.tests.protocol.v1_0; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.junit.Assume.assumeThat; @@ -28,6 +29,7 @@ import java.net.InetSocketAddress; import java.util.stream.IntStream; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; @@ -37,6 +39,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.tests.utils.BrokerAdmin; public class Utils @@ -83,7 +86,7 @@ public class Utils .attachSourceAddress(queueName) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -157,6 +160,7 @@ public class Utils .begin().consumeResponse(Begin.class) .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachSndSettleMode(SenderSettleMode.SETTLED) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) .getLatestResponse(Flow.class); @@ -166,14 +170,21 @@ public class Utils message.length), flow.getLinkCredit().intValue(), is(greaterThan(message.length))); + + int tag = 0; for (String payload : message) { interaction.transferPayloadData(payload) .transferSettled(true) + .transferDeliveryId() + .transferDeliveryTag(new Binary(String.valueOf(tag).getBytes(UTF_8))) .transfer() .sync(); + tag++; } + interaction.doCloseConnection(); } } } + } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java index 1ca4419..b079122 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java @@ -24,14 +24,18 @@ import static org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError.DECO import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; @@ -39,6 +43,8 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.codec.StringWriter; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations; import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; @@ -53,11 +59,13 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.utils.BrokerSpecific; public class DecodeErrorTest extends BrokerAdminUsingTestBase { @@ -74,44 +82,46 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase @SpecificationTest(section = "3.2", description = "Altogether a message consists of the following sections: Zero or one header," + " Zero or one delivery-annotations, [...]") - public void illegalMessageFormatPayload() throws Exception + @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J) + public void illegalMessage() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { + final Interaction interaction = transport.newInteraction(); + final Attach attach = interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachRcvSettleMode(ReceiverSettleMode.SECOND) + .attach() + .consumeResponse(Attach.class) + .getLatestResponse(Attach.class); + assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND))); - List<QpidByteBuffer> payloads = new ArrayList<>(); - final HeaderSection headerSection = new Header().createEncodingRetainingSection(); - payloads.add(headerSection.getEncodedForm()); - headerSection.dispose(); - final StringWriter stringWriter = new StringWriter("string in between annotation sections"); - QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize()); - stringWriter.writeToBuffer(encodedString); - encodedString.flip(); - payloads.add(encodedString); - final DeliveryAnnotationsSection - deliveryAnnotationsSection = - new DeliveryAnnotations(Collections.emptyMap()).createEncodingRetainingSection(); - payloads.add(deliveryAnnotationsSection.getEncodedForm()); - deliveryAnnotationsSection.dispose(); + final Flow flow = interaction.consumeResponse(Flow.class).getLatestResponse(Flow.class); + assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO))); - final Detach detachResponse; - try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads)) + final List<QpidByteBuffer> payloads = buildInvalidMessage(); + try { - detachResponse = transport.newInteraction() - .negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .attachRole(Role.SENDER) - .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) - .attach().consumeResponse(Attach.class) - .consumeResponse(Flow.class) - .transferMessageFormat(UnsignedInteger.ZERO) - .transferPayload(combinedPayload) - .transfer() - .consumeResponse() - .getLatestResponse(Detach.class); + try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads)) + { + interaction.transferMessageFormat(UnsignedInteger.ZERO) + .transferPayload(combinedPayload) + .transfer(); + } } - payloads.forEach(QpidByteBuffer::dispose); + finally + { + payloads.forEach(QpidByteBuffer::dispose); + } + + final Detach detachResponse = interaction.consumeResponse() + .getLatestResponse(Detach.class); assertThat(detachResponse.getError(), is(notNullValue())); assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR))); } @@ -148,6 +158,10 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase { error = ((Close) responseBody).getError(); } + else if (responseBody instanceof Detach) + { + error = ((Detach) responseBody).getError(); + } else { fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody)); @@ -200,4 +214,51 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase assertThat(error.getCondition(), is(equalTo(DECODE_ERROR))); } } + + private List<QpidByteBuffer> buildInvalidMessage() + { + final List<QpidByteBuffer> payloads = new ArrayList<>(); + final Header header = new Header(); + header.setTtl(UnsignedInteger.valueOf(1000L)); + final HeaderSection headerSection = header.createEncodingRetainingSection(); + try + { + payloads.add(headerSection.getEncodedForm()); + } + finally + { + headerSection.dispose(); + } + + final StringWriter stringWriter = new StringWriter("string in between annotation sections"); + QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize()); + stringWriter.writeToBuffer(encodedString); + encodedString.flip(); + payloads.add(encodedString); + + final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar"); + final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap); + final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection(); + try + { + + payloads.add(deliveryAnnotationsSection.getEncodedForm()); + } + finally + { + deliveryAnnotationsSection.dispose(); + } + + final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection(); + try + { + payloads.add(payload.getEncodedForm()); + } + finally + { + payload.dispose(); + } + return payloads; + } + } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java index 1878950..a7ef076 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java @@ -46,8 +46,6 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; -import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; -import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge; import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError; import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -104,6 +102,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) .transferSettled(Boolean.TRUE) .transferDeliveryTag(_deliveryTag) @@ -139,12 +138,13 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferSettled(Boolean.TRUE) .transferDeliveryTag(_deliveryTag) .transfer(); - Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class); + Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class); Error error = detach.getError(); assertThat(error, is(notNullValue())); assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); @@ -179,6 +179,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) .transfer() @@ -226,6 +227,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) .transfer(); @@ -262,6 +264,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) .transferDeliveryTag(_deliveryTag) @@ -271,6 +274,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .txnDischarge(txnState, false); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); } @@ -295,6 +300,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) .transferDeliveryTag(_deliveryTag) @@ -314,6 +320,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase interaction.txnDischarge(txnState, false); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); } @@ -339,6 +347,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) @@ -362,6 +371,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase assertThat(rejectedError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); interaction.txnDischarge(txnState, false); + + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } } @@ -385,6 +396,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryId(UnsignedInteger.valueOf(1)) @@ -400,23 +412,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase assertThat(senderLinkDetachError.getInfo(), is(notNullValue())); assertThat(senderLinkDetachError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); - final Discharge discharge = new Discharge(); - discharge.setTxnId(txnState.getCurrentTransactionId()); - discharge.setFail(false); - - interaction.transferHandle(txnState.getHandle()) - .transferSettled(Boolean.FALSE) - .transferDeliveryId(UnsignedInteger.valueOf(2)) - .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8))) - .transferPayloadData(discharge).transfer(); - - Disposition dischargeTransactionDisposition = - getDispositionForDeliveryId(interaction, UnsignedInteger.valueOf(2)); - - assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true))); - assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class))); + interaction.txnDischarge(txnState, false); - Rejected rejected = (Rejected) dischargeTransactionDisposition.getState(); + DeliveryState txnDischargeDeliveryState = txnState.getDeliveryState(); + assertThat(txnDischargeDeliveryState, is(instanceOf(Rejected.class))); + Rejected rejected = (Rejected) txnDischargeDeliveryState; Error error = rejected.getError(); assertThat(error, is(notNullValue())); @@ -468,30 +468,20 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) .transferTransactionalState(txnState.getCurrentTransactionId()) .transferSettled(Boolean.TRUE) - .transferDeliveryId(UnsignedInteger.valueOf(1)) .transfer(); - final Discharge discharge = new Discharge(); - discharge.setTxnId(txnState.getCurrentTransactionId()); - discharge.setFail(false); - interaction.transferHandle(txnState.getHandle()) - .transferDeliveryId(UnsignedInteger.valueOf(2)) - .transferSettled(Boolean.FALSE) - .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8))) - .transferPayloadData(discharge).transfer(); - - Disposition dischargeTransactionDisposition = - getDispositionForDeliveryId(interaction, UnsignedInteger.valueOf(2)); + interaction.txnDischarge(txnState, false); - assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true))); - assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class))); + DeliveryState txDischargeDeliveryState = txnState.getDeliveryState(); + assertThat(txDischargeDeliveryState, is(instanceOf(Rejected.class))); - Rejected rejected = (Rejected) dischargeTransactionDisposition.getState(); + Rejected rejected = (Rejected) txDischargeDeliveryState; Error error = rejected.getError(); assertThat(error, is(notNullValue())); @@ -536,14 +526,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase interaction.begin() .consumeResponse(Begin.class) - .attachRole(Role.SENDER) - .attachName("testTransactionCoordinator-" + txnState.getHandle()) - .attachHandle(txnState.getHandle()) - .attachInitialDeliveryCount(UnsignedInteger.ZERO) - .attachTarget(new Coordinator()) - .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) - .attach().consumeResponse(Attach.class) - .consumeResponse(Flow.class) + .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL) .txnDeclare(txnState) .attachRole(Role.SENDER) @@ -554,22 +537,14 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) .transferTransactionalState(txnState.getCurrentTransactionId()) .transferSettled(Boolean.TRUE) - .transfer(); - - final Discharge discharge = new Discharge(); - discharge.setTxnId(txnState.getCurrentTransactionId()); - discharge.setFail(false); - - interaction.transferHandle(txnState.getHandle()) - .transferSettled(Boolean.FALSE) - .transferDeliveryId(UnsignedInteger.valueOf(4)) - .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8))) - .transferPayloadData(discharge).transfer(); + .transfer() + .txnSendDischarge(txnState, false); Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class); Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError(); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java index d6eb3d1..39a8a9b 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java @@ -171,7 +171,7 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED)); assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle()))); - interaction.discharge(txnState, false); + interaction.txnSendDischarge(txnState, false); assertTransactionRollbackOnly(interaction, txnState); } @@ -244,7 +244,7 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED)); assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle()))); - interaction.discharge(txnState, false); + interaction.txnSendDischarge(txnState, false); assertTransactionRollbackOnly(interaction, txnState); } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java index 1cf1ff0..6ac8e7f 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java @@ -70,7 +70,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2); + QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2); final Response<?> latestResponse = transport.newInteraction() .negotiateProtocol().consumeResponse() diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java index ed9e58d..3b421c7 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.tests.protocol.ChannelClosedResponse; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.SpecificationTest; @@ -229,9 +230,11 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase { payload.dispose(); } - Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse(); - assertThat(latestResponse, is(nullValue())); + interaction.consumeResponse(null, Flow.class); } + String secondMessage = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(secondMessage))); } @Test @@ -387,7 +390,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase payload.dispose(); } - interaction.consumeResponse(Detach.class, End.class, Close.class); + interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class); } } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 7950741..42f5dfc 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -38,6 +38,7 @@ import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.TreeSet; @@ -62,7 +63,6 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; import org.apache.qpid.server.protocol.v1_0.type.messaging.Received; import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; -import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; @@ -78,6 +78,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; +import org.apache.qpid.tests.protocol.ChannelClosedResponse; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; @@ -121,13 +122,13 @@ public class TransferTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "1.3.4", - description = "Transfer without mandatory fields should result in a decoding error.") + description = "mandatory [...] a non null value for the field is always encoded.") public void emptyTransfer() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - Close responseClose = transport.newInteraction() - .negotiateProtocol().consumeResponse() + Interaction interact = transport.newInteraction(); + Response<?> response = interact.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) .attachRole(Role.SENDER) @@ -136,9 +137,27 @@ public class TransferTest extends BrokerAdminUsingTestBase .transferHandle(null) .transfer() .consumeResponse() - .getLatestResponse(Close.class); - assertThat(responseClose.getError(), is(notNullValue())); - assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + .getLatestResponse(); + + assertThat(response.getBody(), is(notNullValue())); + + if (response.getBody() instanceof Close) + { + final Close responseClose = (Close)response.getBody(); + assertThat(responseClose.getError(), is(notNullValue())); + assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + + interact.close().sync(); + } + else if (response.getBody() instanceof End) + { + final End responseEnd = (End)response.getBody(); + assertThat(responseEnd.getError(), is(notNullValue())); + assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + + interact.end().doCloseConnection(); + } + transport.assertNoMoreResponses(); } } @@ -158,10 +177,11 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferDeliveryTag(null) .transferPayloadData(getTestName()) .transfer(); - interaction.consumeResponse(Detach.class, End.class, Close.class); + interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class); } } @@ -182,6 +202,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachHandle(linkHandle) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) .transfer() @@ -283,12 +304,18 @@ public class TransferTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "2.7.5", - description = "If the negotiated link value is first, then it is illegal to set this field to second.") + description = "rcv-settle-mode " + + "If first, this indicates that the receiver MUST settle the delivery once it has arrived" + + " without waiting for the sender to settle first." + + " If second, this indicates that the receiver MUST NOT settle until sending its disposition" + + " to the sender and receiving a settled disposition from the sender." + + " If not set, this value is defaulted to the value negotiated on link attach." + + " If the negotiated link value is first, then it is illegal to set this field to second.") public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - Detach detach = transport.newInteraction() + Response<?> response = transport.newInteraction() .negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) @@ -301,10 +328,24 @@ public class TransferTest extends BrokerAdminUsingTestBase .transferRcvSettleMode(ReceiverSettleMode.SECOND) .transfer() .consumeResponse() - .getLatestResponse(Detach.class); - Error error = detach.getError(); - assertThat(error, is(notNullValue())); - assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD))); + .getLatestResponse(); + + if (response.getBody() instanceof Detach) + { + final Detach detach = (Detach) response.getBody(); + Error error = detach.getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD))); + } + else + { + if (response.getBody() instanceof Disposition) + { + // clean up + Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); + } + fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'"); + } } } @@ -366,6 +407,7 @@ public class TransferTest extends BrokerAdminUsingTestBase Rejected.REJECTED_SYMBOL) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferPayload(messageEncoder.getPayload()) .transferRcvSettleMode(ReceiverSettleMode.FIRST) .transfer() @@ -411,6 +453,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferPayload(messageEncoder.getPayload()) .transferRcvSettleMode(ReceiverSettleMode.FIRST) .transfer() @@ -453,7 +496,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -493,7 +536,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachRcvSettleMode(ReceiverSettleMode.FIRST) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -519,8 +562,6 @@ public class TransferTest extends BrokerAdminUsingTestBase @SpecificationTest(section = "2.6.12", description = "Transferring A Message.") public void receiveTransferReceiverSettleSecond() throws Exception { - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); - try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction() @@ -532,19 +573,22 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachRcvSettleMode(ReceiverSettleMode.SECOND) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) .flowHandleFromLinkHandle() - .flow() - .receiveDelivery() - .decodeLatestDelivery(); + .flow(); - Object data = interaction.getDecodedLatestDelivery(); + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); + + Object data = interaction.receiveDelivery() + .decodeLatestDelivery() + .getDecodedLatestDelivery(); assertThat(data, is(equalTo(getTestName()))); Disposition disposition = interaction.dispositionSettled(false) + .dispositionFirstFromLatestDelivery() .dispositionRole(Role.RECEIVER) .dispositionState(new Accepted()) .disposition() @@ -552,8 +596,11 @@ public class TransferTest extends BrokerAdminUsingTestBase .getLatestResponse(Disposition.class); assertThat(disposition.getSettled(), is(true)); - interaction.consumeResponse(null, Flow.class); - + interaction.dispositionSettled(true) + .dispositionFirstFromLatestDelivery() + .dispositionRole(Role.RECEIVER) + .dispositionState(new Accepted()) + .disposition(); } } @@ -561,8 +608,6 @@ public class TransferTest extends BrokerAdminUsingTestBase @SpecificationTest(section = "2.6.12", description = "Transferring A Message.") public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception { - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); - try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction() @@ -575,17 +620,20 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachRcvSettleMode(ReceiverSettleMode.SECOND) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) .flowHandleFromLinkHandle() .flow(); + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); + Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery(); assertThat(data, is(equalTo(getTestName()))); interaction.dispositionSettled(false) + .dispositionFirstFromLatestDelivery() .dispositionRole(Role.RECEIVER) .dispositionState(new Rejected()) .disposition() @@ -599,9 +647,11 @@ public class TransferTest extends BrokerAdminUsingTestBase Disposition disposition = interaction.getLatestResponse(Disposition.class); assertThat(disposition.getSettled(), is(true)); - interaction.consumeResponse(null, Flow.class); - - + interaction.dispositionSettled(true) + .dispositionFirstFromLatestDelivery() + .dispositionRole(Role.RECEIVER) + .dispositionState(new Rejected()) + .disposition(); } assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); @@ -627,7 +677,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachSourceDefaultOutcome(null) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -640,6 +690,7 @@ public class TransferTest extends BrokerAdminUsingTestBase assertThat(data, is(equalTo(getTestName()))); Disposition disposition = interaction.dispositionSettled(false) + .dispositionFirstFromLatestDelivery() .dispositionRole(Role.RECEIVER) .dispositionState(null) .disposition() @@ -658,7 +709,6 @@ public class TransferTest extends BrokerAdminUsingTestBase + " non-terminal delivery states to the sender") public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() throws Exception { - String testMessageData; try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); @@ -667,19 +717,13 @@ public class TransferTest extends BrokerAdminUsingTestBase .openMaxFrameSize(UnsignedInteger.valueOf(4096)) .open().consumeResponse() .getLatestResponse(Open.class); - - int negotiatedFrameSize = open.getMaxFrameSize().intValue(); - testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining()); - - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData); - interaction.begin().consumeResponse() .attachRole(Role.RECEIVER) .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) .attachRcvSettleMode(ReceiverSettleMode.SECOND) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -687,6 +731,11 @@ public class TransferTest extends BrokerAdminUsingTestBase .flow() .sync(); + final int negotiatedFrameSize = open.getMaxFrameSize().intValue(); + final String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining()); + + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData); + MessageDecoder messageDecoder = new MessageDecoder(); Transfer first = interaction.consumeResponse(Transfer.class) @@ -745,14 +794,14 @@ public class TransferTest extends BrokerAdminUsingTestBase assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.SETTLED))); interaction.flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) - .flowOutgoingWindow(UnsignedInteger.ZERO) - .flowNextOutgoingId(UnsignedInteger.ZERO) - .flowLinkCredit(UnsignedInteger.ONE) - .flowHandleFromLinkHandle() - .flow(); + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow(); - List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery(); + final List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery(); final AtomicBoolean isSettled = new AtomicBoolean(); transfers.forEach(transfer -> { if (Boolean.TRUE.equals(transfer.getSettled())) { isSettled.set(true);}}); @@ -762,10 +811,6 @@ public class TransferTest extends BrokerAdminUsingTestBase interaction.doCloseConnection(); } - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); - } Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test"); assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test"))); } @@ -793,17 +838,13 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) .attach() .consumeResponse(Attach.class) - .consumeResponse(Flow.class); - - Flow flow = interaction.getLatestResponse(Flow.class); - assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1))); - - interaction.transferDeliveryId(UnsignedInteger.ZERO) + .consumeResponse(Flow.class) + .transferDeliveryId() .transferDeliveryTag(deliveryTag) .transferPayloadData(content1) .transfer() .transferDeliveryTag(deliveryTag) - .transferDeliveryId(UnsignedInteger.ONE) + .transferDeliveryId() .transferPayloadData(getTestName() + "_2") .transfer() .sync(); @@ -1019,28 +1060,21 @@ public class TransferTest extends BrokerAdminUsingTestBase interaction.txnAttachCoordinatorLink(txnState) .txnDeclare(txnState); - interaction.transferDeliveryId(UnsignedInteger.ONE) + interaction.transferDeliveryId() .transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8))) .transferPayloadData(contents[0]) .transfer() - .transferDeliveryId(UnsignedInteger.valueOf(2)) + .transferDeliveryId() .transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8))) .transferPayloadData(contents[1]) .transfer() - .transferDeliveryId(UnsignedInteger.valueOf(3)) + .transferDeliveryId() .transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8))) .transferTransactionalState(txnState.getCurrentTransactionId()) .transferPayloadData(contents[2]) .transfer(); - final Discharge discharge = new Discharge(); - discharge.setTxnId(txnState.getCurrentTransactionId()); - discharge.setFail(false); - - interaction.transferHandle(txnState.getHandle()) - .transferDeliveryId(UnsignedInteger.valueOf(4)) - .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8))) - .transferPayloadData(discharge).transfer(); + interaction.txnSendDischarge(txnState, false); assertDeliveries(interaction, Sets.newTreeSet(Arrays.asList(UnsignedInteger.ONE, UnsignedInteger.valueOf(2), @@ -1069,39 +1103,32 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachRcvSettleMode(ReceiverSettleMode.FIRST) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages)) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages)) .flowHandleFromLinkHandle() .flow(); - for (int i = 0; i < contents.length; i++) + UnsignedInteger firstDeliveryId = null; + for (final String content : contents) { interaction.receiveDelivery(Flow.class).decodeLatestDelivery(); Object data = interaction.getDecodedLatestDelivery(); - assertThat(data, is(equalTo(contents[i]))); - assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i)))); + assertThat(data, is(equalTo(content))); + if (firstDeliveryId == null) + { + firstDeliveryId = interaction.getLatestDeliveryId(); + } } interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionFirst(UnsignedInteger.ZERO) + .dispositionFirst(firstDeliveryId) .dispositionLast(interaction.getLatestDeliveryId()) .dispositionState(new Accepted()) .disposition(); - - // make sure sure the disposition is handled by making drain request - interaction.flowLinkCredit(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.valueOf(numberOfMessages)) - .flowDrain(Boolean.TRUE) - .flow() - .consumeResponse(Flow.class); - - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); - } + interaction.doCloseConnection(); } final String messageText = getTestName() + "_" + 4; @@ -1116,7 +1143,6 @@ public class TransferTest extends BrokerAdminUsingTestBase { final int numberOfMessages = 4; final String[] contents = Utils.createTestMessageContents(numberOfMessages, getTestName()); - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents); try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { @@ -1130,19 +1156,22 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachHandle(UnsignedInteger.ZERO) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages)) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages)) .flowHandleFromLinkHandle() .flow(); - for (int i = 0; i < contents.length; i++) + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents); + + final List<UnsignedInteger> deliveryIds = new ArrayList<>(); + for (final String content : contents) { interaction.receiveDelivery(Flow.class).decodeLatestDelivery(); Object data = interaction.getDecodedLatestDelivery(); - assertThat(data, is(equalTo(contents[i]))); - assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i)))); + assertThat(data, is(equalTo(content))); + deliveryIds.add(interaction.getLatestDeliveryId()); } final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE); @@ -1151,31 +1180,18 @@ public class TransferTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionFirst(UnsignedInteger.ZERO) - .dispositionLast(UnsignedInteger.ONE) + .dispositionFirst(deliveryIds.get(0)) + .dispositionLast(deliveryIds.get(1)) .dispositionState(new Accepted()) .disposition() .dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionFirst(UnsignedInteger.valueOf(2)) - .dispositionLast(UnsignedInteger.valueOf(3)) + .dispositionFirst(deliveryIds.get(2)) + .dispositionLast(deliveryIds.get(3)) .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) .disposition(); - - final Discharge discharge = new Discharge(); - discharge.setTxnId(txnState.getCurrentTransactionId()); - discharge.setFail(false); - - interaction.transferHandle(txnState.getHandle()) - .transferDeliveryId(UnsignedInteger.valueOf(4)) - .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8))) - .transferPayloadData(discharge) - .transfer(); - - - final Flow coordinatorFlow = interaction.consume(Flow.class, Disposition.class); - assertThat(coordinatorFlow.getHandle(), is(equalTo(txnState.getHandle()))); + interaction.txnDischarge(txnState, false); } String messageText = getTestName() + "_" + 4; @@ -1203,6 +1219,10 @@ public class TransferTest extends BrokerAdminUsingTestBase expectedDeliveryIds.remove(deliveryId); }); } + else if (response.getBody() instanceof Flow) + { + // ignore flows + } } while (!expectedDeliveryIds.isEmpty()); } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java index 32a96b0..85040c8 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.tests.protocol.v1_0.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; @@ -28,7 +29,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; import java.util.List; @@ -40,10 +40,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; -import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; -import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare; import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared; -import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge; import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; @@ -55,10 +52,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; +import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; -import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -76,86 +73,76 @@ public class DischargeTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "4.3", - description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller " - + "as a transaction-error. If the source for the link to the coordinator supports the rejected outcome, then the " - + "message MUST be rejected with this outcome carrying the transaction-error.") + description = "If the coordinator is unable to complete the discharge," + + " the coordinator MUST convey the error to the controller as a transaction-error." + + " If the source for the link to the coordinator supports the rejected outcome, then the " + + " message MUST be rejected with this outcome carrying the transaction-error.") public void dischargeUnknownTransactionIdWhenSourceSupportsRejectedOutcome() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { + final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO); final Interaction interaction = transport.newInteraction(); - final Disposition disposition = interaction.negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .attachRole(Role.SENDER) - .attachSourceOutcomes(Rejected.REJECTED_SYMBOL) - .attachTarget(new Coordinator()) - .attach().consumeResponse(Attach.class) - .consumeResponse(Flow.class) - .transferPayloadData(new Declare()) - .transfer().consumeResponse() - .getLatestResponse(Disposition.class); - - assertThat(disposition.getSettled(), is(equalTo(true))); - assertThat(disposition.getState(), is(instanceOf(Declared.class))); - assertThat(((Declared) disposition.getState()).getTxnId(), is(notNullValue())); - - interaction.consumeResponse(Flow.class); - - final Discharge discharge = new Discharge(); - discharge.setTxnId(new Binary("nonExistingTransaction".getBytes(UTF_8))); - final Disposition dischargeDisposition = interaction.transferDeliveryId(UnsignedInteger.ONE) - .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8))) - .transferPayloadData(discharge) - .transfer().consumeResponse() - .getLatestResponse(Disposition.class); - assertThat(dischargeDisposition.getState(), is(instanceOf(Rejected.class))); - final Error error = ((Rejected) dischargeDisposition.getState()).getError(); + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + + .txnAttachCoordinatorLink(txnState, Rejected.REJECTED_SYMBOL) + .txnDeclare(txnState); + + assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class))); + assertThat(txnState.getCurrentTransactionId(), is(notNullValue())); + + txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8))); + interaction.txnDischarge(txnState, false); + interaction.doCloseConnection(); + + assertThat(txnState.getDeliveryState(), is(instanceOf(Rejected.class))); + final Error error = ((Rejected) txnState.getDeliveryState()).getError(); assertThat(error, is(notNullValue())); - assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID))); + + if (KIND_BROKER_J.equals(getBrokerAdmin().getKind())) + { + assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID))); + } } } @Test @SpecificationTest(section = "4.3", - description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller " - + "as a transaction-error. [...] If the source does not support " - + "the rejected outcome, the transactional resource MUST detach the link to the coordinator, with the detach " - + "performative carrying the transaction-error.") + description = "If the coordinator is unable to complete the discharge," + + " the coordinator MUST convey the error to the controller as a transaction-error." + + " [...] If the source does not support the rejected outcome, the transactional resource" + + " MUST detach the link to the coordinator, with the detach performative carrying" + + " the transaction-error.") public void dischargeUnknownTransactionIdWhenSourceDoesNotSupportRejectedOutcome() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { + final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO); final Interaction interaction = transport.newInteraction(); - final Disposition disposition = interaction.negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .attachRole(Role.SENDER) - .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) - .attachTarget(new Coordinator()) - .attach().consumeResponse(Attach.class) - .consumeResponse(Flow.class) - .transferPayloadData(new Declare()) - .transfer().consumeResponse() - .getLatestResponse(Disposition.class); - - - assertThat(disposition.getSettled(), is(equalTo(true))); - assertThat(disposition.getState(), is(instanceOf(Declared.class))); - assertThat(((Declared) disposition.getState()).getTxnId(), is(notNullValue())); - - interaction.consumeResponse(Flow.class); - - final Discharge discharge = new Discharge(); - discharge.setTxnId(new Binary("nonExistingTransaction".getBytes(UTF_8))); - final Detach detachResponse = interaction.transferDeliveryId(UnsignedInteger.ONE) - .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8))) - .transferPayloadData(discharge) - .transfer().consumeResponse(Detach.class) - .getLatestResponse(Detach.class); - Error error = detachResponse.getError(); + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL) + .txnDeclare(txnState); + + assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class))); + assertThat(txnState.getCurrentTransactionId(), is(notNullValue())); + + txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8))); + interaction.txnSendDischarge(txnState, false); + + final Detach detachResponse = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class); + interaction.doCloseConnection(); + + final Error error = detachResponse.getError(); assertThat(error, is(notNullValue())); - assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID))); + + if (KIND_BROKER_J.equals(getBrokerAdmin().getKind())) + { + assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID))); + } } } @@ -167,33 +154,34 @@ public class DischargeTest extends BrokerAdminUsingTestBase + " desired transaction identifier and the outcome to be applied upon a successful discharge.") public void dischargeSettledAfterReceiverDetach() throws Exception { - assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true)); - - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test message"); try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); - List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) - - .attachRole(Role.RECEIVER) - .attachHandle(UnsignedInteger.ONE) - .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) - .attachRcvSettleMode(ReceiverSettleMode.FIRST) - .attach().consumeResponse(Attach.class) - - .flowIncomingWindow(UnsignedInteger.ONE) - .flowLinkCredit(UnsignedInteger.ONE) - .flowHandleFromLinkHandle() - .flow() - - .receiveDelivery() - .getLatestDelivery(); + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + + .txnAttachCoordinatorLink(txnState) + .txnDeclare(txnState) + + .attachRole(Role.RECEIVER) + .attachHandle(UnsignedInteger.ONE) + .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachRcvSettleMode(ReceiverSettleMode.FIRST) + .attach().consumeResponse(Attach.class) + + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowIncomingWindow(UnsignedInteger.ONE) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow(); + + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); + + List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery(); assertThat(transfers, is(notNullValue())); assertThat(transfers, is(not(empty()))); final UnsignedInteger deliveryId = transfers.get(0).getDeliveryId(); @@ -204,8 +192,15 @@ public class DischargeTest extends BrokerAdminUsingTestBase .disposition() .txnDischarge(txnState, false); - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + + interaction.doCloseConnection(); } + + String secondMessage = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage); + Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); + assertThat(receivedMessage, is(equalTo(secondMessage))); } @Test @@ -219,8 +214,6 @@ public class DischargeTest extends BrokerAdminUsingTestBase + " was associated.") public void dischargeSettledAfterSenderDetach() throws Exception { - assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true)); - try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); @@ -238,20 +231,21 @@ public class DischargeTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferTransactionalState(txnState.getCurrentTransactionId()) - .transferPayloadData("test message") + .transferPayloadData(getTestName()) .transferHandle(UnsignedInteger.ONE) .transfer().consumeResponse(Disposition.class) .detachHandle(UnsignedInteger.ONE) .detach().consumeResponse(Detach.class); - - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); - interaction.txnDischarge(txnState, false); - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } + + final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); + assertThat(receivedMessage, is(equalTo(getTestName()))); } @Test @@ -262,8 +256,6 @@ public class DischargeTest extends BrokerAdminUsingTestBase + " reflect the outcome that was applied.") public void dischargeUnsettledAfterSenderClose() throws Exception { - assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true)); - try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); @@ -282,21 +274,22 @@ public class DischargeTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferTransactionalState(txnState.getCurrentTransactionId()) - .transferPayloadData("test message") + .transferPayloadData(getTestName()) .transferHandle(UnsignedInteger.ONE) .transfer().consumeResponse(Disposition.class) .detachHandle(UnsignedInteger.ONE) .detachClose(true) .detach().consumeResponse(Detach.class); - - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); - interaction.txnDischarge(txnState, false); - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } + + final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); + assertThat(receivedMessage, is(equalTo(getTestName()))); } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java index 853fdd9..f44790f 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; import java.util.Collections; @@ -102,6 +103,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) .transferTransactionalState(txnState.getCurrentTransactionId()) @@ -116,9 +118,10 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.txnDischarge(txnState, false); - Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); - assertThat(receivedMessage, is(equalTo(getTestName()))); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } + Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); + assertThat(receivedMessage, is(equalTo(getTestName()))); } @Test @@ -150,6 +153,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) .transferTransactionalState(txnState.getCurrentTransactionId()) @@ -164,16 +168,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.txnDischarge(txnState, true); - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); - } - else - { - final String content = getTestName() + "_2"; - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); - assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content))); - } + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + + final String content = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content))); } } @@ -204,9 +203,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) .attachRcvSettleMode(ReceiverSettleMode.SECOND) .attachHandle(linkHandle) - .attach().consumeResponse(Attach.class) + .attach() + .consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) .transferTransactionalState(txnState.getCurrentTransactionId()) @@ -225,6 +226,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .disposition(); interaction.txnDischarge(txnState, false); + + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); } @@ -255,6 +258,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) + .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) .transferTransactionalState(integerToBinary(Integer.MAX_VALUE)) @@ -295,7 +299,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .consumeResponse(Attach.class) .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -313,6 +317,9 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) .disposition() .txnDischarge(txnState, false); + + + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } } @@ -344,7 +351,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .consumeResponse(Attach.class) .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -363,6 +370,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .disposition() .txnDischarge(txnState, true); + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(getTestName()))); } @@ -396,7 +405,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .attach().consumeResponse(Attach.class) .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -421,7 +430,10 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .consumeResponse().getLatestResponse(); assertUnknownTransactionIdError(response); } - assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); + finally + { + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); + } } @Ignore("TODO disposition is currently not being sent by Broker") @@ -479,6 +491,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class))); interaction.txnDischarge(txnState, false); + + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); } } @@ -514,7 +528,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .consumeResponse(Attach.class) .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -526,9 +540,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase List<Transfer> transfers = interaction.getLatestDelivery(); assertThat(transfers.size(), is(equalTo(1))); - Transfer transfer = transfers.get(0); - assertThat(transfer.getState(), is(instanceOf(TransactionalState.class))); - assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId()))); Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery(); assertThat(data, is(equalTo(getTestName()))); @@ -536,19 +547,20 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionFirstFromLatestDelivery() .disposition() .txnDischarge(txnState, false); - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); - } - else - { - final String content = getTestName() + "_2"; - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); - assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content))); - } + + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + + Transfer transfer = transfers.get(0); + assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class))); + assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId()))); + + final String content = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content))); } } @@ -584,7 +596,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .consumeResponse(Attach.class) .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -596,9 +608,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase List<Transfer> transfers = interaction.getLatestDelivery(); assertThat(transfers.size(), is(equalTo(1))); - Transfer transfer = transfers.get(0); - assertThat(transfer.getState(), is(instanceOf(TransactionalState.class))); - assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId()))); Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery(); assertThat(data, is(equalTo(getTestName()))); @@ -609,11 +618,13 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .disposition() .txnDischarge(txnState, true); - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); - } + assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); + + Transfer transfer = transfers.get(0); + assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class))); + assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId()))); } } @@ -661,7 +672,10 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase assertUnknownTransactionIdError(response); } - assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); + finally + { + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); + } } private void assertUnknownTransactionIdError(final Response<?> response) diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java index 1bfa6d3..448de42 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.tests.protocol.v1_0.transport.link; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError; +import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; @@ -53,7 +55,7 @@ public class FlowTest extends BrokerAdminUsingTestBase { @Test @SpecificationTest(section = "1.3.4", - description = "Flow without mandatory fields should result in a decoding error.") + description = "mandatory [...] a non null value for the field is always encoded.") public void emptyFlow() throws Exception { getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); @@ -69,7 +71,7 @@ public class FlowTest extends BrokerAdminUsingTestBase .flowOutgoingWindow(null) .flowNextOutgoingId(null) .flow() - .consumeResponse() + .consumeResponse(Close.class) .getLatestResponse(Close.class); assertThat(responseClose.getError(), is(notNullValue())); assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR)); @@ -89,6 +91,11 @@ public class FlowTest extends BrokerAdminUsingTestBase .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) .flowEcho(true) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowIncomingWindow(UnsignedInteger.ONE) + .flowHandle(null) .flow() .consumeResponse() .getLatestResponse(Flow.class); @@ -118,7 +125,10 @@ public class FlowTest extends BrokerAdminUsingTestBase .flowHandleFromLinkHandle() .flowAvailable(UnsignedInteger.valueOf(10)) .flowDeliveryCount(UnsignedInteger.ZERO) - .flowLinkCredit(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flow().consumeResponse() .getLatestResponse(Flow.class); assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE))); @@ -146,7 +156,7 @@ public class FlowTest extends BrokerAdminUsingTestBase .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) .attach().consumeResponse() .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowOutgoingWindow(UnsignedInteger.ZERO) .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) @@ -156,7 +166,7 @@ public class FlowTest extends BrokerAdminUsingTestBase .decodeLatestDelivery() .dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionFirst(interaction.getLatestDeliveryId()) + .dispositionFirstFromLatestDelivery() .dispositionLast(interaction.getLatestDeliveryId()) .dispositionState(new Accepted()) .disposition() @@ -215,7 +225,12 @@ public class FlowTest extends BrokerAdminUsingTestBase .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) .flowEcho(true) - .flowHandle(UnsignedInteger.ONE) + .flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandle(UnsignedInteger.valueOf(Integer.MAX_VALUE)) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) .flow() .consumeResponse().getLatestResponse(End.class); @@ -249,20 +264,22 @@ public class FlowTest extends BrokerAdminUsingTestBase UnsignedInteger remoteHandle = remoteAttach.getHandle(); assertThat(remoteHandle, is(notNullValue())); - Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1)) - .flowNextIncomingId(UnsignedInteger.ZERO) + interaction.flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) .flowDrain(Boolean.FALSE) .flowEcho(Boolean.TRUE) .flowHandleFromLinkHandle() .flow() - .consumeResponse().getLatestResponse(Flow.class); - - assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle))); - assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ONE))); - assertThat(responseFlow.getDrain(), is(equalTo(Boolean.FALSE))); + .consumeResponse(null, Flow.class); - responseFlow = interaction.flowLinkCredit(UnsignedInteger.ONE) + Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) .flowDrain(Boolean.TRUE) .flowEcho(Boolean.FALSE) .flowHandleFromLinkHandle() @@ -271,6 +288,7 @@ public class FlowTest extends BrokerAdminUsingTestBase assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle))); assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO))); + assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE))); } } @@ -286,8 +304,6 @@ public class FlowTest extends BrokerAdminUsingTestBase { BrokerAdmin brokerAdmin = getBrokerAdmin(); brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME); - String messageContent = getTestName(); - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent); final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); try (FrameTransport transport = new FrameTransport(addr).connect()) @@ -304,35 +320,42 @@ public class FlowTest extends BrokerAdminUsingTestBase UnsignedInteger remoteHandle = remoteAttach.getHandle(); assertThat(remoteHandle, is(notNullValue())); - Object receivedMessageContent = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1)) - .flowNextIncomingId(UnsignedInteger.ZERO) + interaction.flowIncomingWindow(UnsignedInteger.valueOf(1)) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowLinkCredit(UnsignedInteger.ONE) .flowDrain(Boolean.FALSE) .flowEcho(Boolean.FALSE) .flowHandleFromLinkHandle() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) .flow() - .receiveDelivery() - .decodeLatestDelivery() - .getDecodedLatestDelivery(); + .sync(); - assertThat(receivedMessageContent, is(equalTo(messageContent))); - UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId(); - assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO))); + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); - Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE) - .flowLinkCredit(UnsignedInteger.ONE) - .flowDrain(Boolean.TRUE) - .flowEcho(Boolean.FALSE) - .flowHandleFromLinkHandle() - .flow() - .consumeResponse().getLatestResponse(Flow.class); + final Object receivedMessageContent = interaction.receiveDelivery() + .decodeLatestDelivery() + .getDecodedLatestDelivery(); + + assertThat(receivedMessageContent, is(equalTo(getTestName()))); + + final Flow responseFlow = interaction.flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowLinkCredit(UnsignedInteger.ONE) + .flowDrain(Boolean.TRUE) + .flowEcho(Boolean.FALSE) + .flowHandleFromLinkHandle() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowDeliveryCount() + .flow() + .consumeResponse().getLatestResponse(Flow.class); assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle))); assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO))); interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionFirst(firstDeliveryId) + .dispositionFirst(interaction.getLatestDeliveryId()) .dispositionState(new Accepted()) .disposition() .sync(); @@ -373,9 +396,12 @@ public class FlowTest extends BrokerAdminUsingTestBase UnsignedInteger delta = UnsignedInteger.ONE; UnsignedInteger incomingWindow = UnsignedInteger.valueOf(3); Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow) - .flowNextIncomingId(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowLinkCredit(delta) .flowHandleFromLinkHandle() + .flowDeliveryCount() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) .flow() .receiveDelivery() .decodeLatestDelivery() @@ -383,12 +409,14 @@ public class FlowTest extends BrokerAdminUsingTestBase assertThat(receivedMessageContent1, is(equalTo(contents[0]))); UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId(); - assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO))); Object receivedMessageContent2 = interaction.flowIncomingWindow(incomingWindow) - .flowNextIncomingId(UnsignedInteger.ONE) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() .flowLinkCredit(delta) .flowHandleFromLinkHandle() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowDeliveryCount() .flow() .receiveDelivery() .decodeLatestDelivery() @@ -396,17 +424,18 @@ public class FlowTest extends BrokerAdminUsingTestBase assertThat(receivedMessageContent2, is(equalTo(contents[1]))); UnsignedInteger secondDeliveryId = interaction.getLatestDeliveryId(); - assertThat(secondDeliveryId, is(equalTo(UnsignedInteger.ONE))); // send session flow with echo=true to verify that no message is delivered without issuing a credit - Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.valueOf(2)) - .flowLinkCredit(null) - .flowHandle(null) - .flowEcho(Boolean.TRUE) - .flow() - .consumeResponse().getLatestResponse(Flow.class); - - assertThat(responseFlow.getHandle(), is(nullValue())); + interaction.flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowIncomingWindow(incomingWindow) + .flowLinkCredit(null) + .flowHandle(null) + .flowDeliveryCount(null) + .flowEcho(Boolean.TRUE) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flow() + .consumeResponse(null, Flow.class); interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) @@ -429,10 +458,8 @@ public class FlowTest extends BrokerAdminUsingTestBase { BrokerAdmin brokerAdmin = getBrokerAdmin(); brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME); - final String[] contents = Utils.createTestMessageContents(2, getTestName()); - Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents); - final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName()); try (FrameTransport transport = new FrameTransport(addr).connect()) { Interaction interaction = transport.newInteraction() @@ -447,36 +474,68 @@ public class FlowTest extends BrokerAdminUsingTestBase UnsignedInteger remoteHandle = remoteAttach.getHandle(); assertThat(remoteHandle, is(notNullValue())); - Object receivedMessageContent1 = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2)) - .flowNextIncomingId(UnsignedInteger.ZERO) - .flowLinkCredit(UnsignedInteger.ONE) + UnsignedInteger incomingWindow = UnsignedInteger.valueOf(2); + Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowLinkCredit(incomingWindow) + .flowNextOutgoingId() .flowHandleFromLinkHandle() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) .flow() .receiveDelivery() .decodeLatestDelivery() .getDecodedLatestDelivery(); - - assertThat(receivedMessageContent1, is(equalTo(contents[0]))); - assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO))); - - Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE) - .flowLinkCredit(UnsignedInteger.ZERO) - .flowHandleFromLinkHandle() - .flowEcho(Boolean.TRUE) - .flow() - .consumeResponse().getLatestResponse(Flow.class); - - assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle))); - assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO))); - - interaction.dispositionSettled(true) - .dispositionRole(Role.RECEIVER) - .dispositionFirst(interaction.getLatestDeliveryId()) - .dispositionState(new Accepted()) - .disposition() - .sync(); + assertThat(receivedMessageContent1, is(equalTo(getTestName()))); + + final Response<?> response = interaction.flowIncomingWindow(incomingWindow) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowLinkCredit(UnsignedInteger.ZERO) + .flowHandleFromLinkHandle() + .flowEcho(Boolean.TRUE) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowDeliveryCount() + .flow() + .consumeResponse(null, Flow.class) + .getLatestResponse(); + + if (response != null) + { + assertThat(response.getBody(), is(instanceOf(Flow.class))); + final Flow responseFlow = (Flow) response.getBody(); + assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE))); + assertThat(responseFlow.getHandle(), is(notNullValue())); + } + + final String message2 = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, message2); + try + { + // send session flow with echo=true to verify that no message is delivered without issuing a credit + interaction.flowIncomingWindow(incomingWindow) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowLinkCredit(null) + .flowHandle(null) + .flowDeliveryCount(null) + .flowEcho(Boolean.TRUE) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flow() + .consumeResponse(null, Flow.class); + } + finally + { + assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(message2))); + + interaction.dispositionSettled(true) + .dispositionRole(Role.RECEIVER) + .dispositionFirst(interaction.getLatestDeliveryId()) + .dispositionState(new Accepted()) + .disposition() + .sync(); + } } - assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(contents[1]))); } @Test @@ -485,7 +544,7 @@ public class FlowTest extends BrokerAdminUsingTestBase + " available to consume the current link-credit. If set, the sender will" + " (after sending all available messages) advance the delivery-count as much as possible," + " consuming all link-credit, and send the flow state to the receiver.") - public void drainWithZeroCredits() throws Exception + public void drain() throws Exception { BrokerAdmin brokerAdmin = getBrokerAdmin(); brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME); @@ -507,16 +566,26 @@ public class FlowTest extends BrokerAdminUsingTestBase assertThat(remoteHandle, is(notNullValue())); Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2)) - .flowNextIncomingId(UnsignedInteger.ZERO) - .flowLinkCredit(UnsignedInteger.ZERO) + .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() + .flowLinkCredit(UnsignedInteger.valueOf(2)) .flowDrain(Boolean.TRUE) .flowHandleFromLinkHandle() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) .flow() - .consumeResponse().getLatestResponse(Flow.class); + .receiveDelivery() + .decodeLatestDelivery() + .consumeResponse(Flow.class).getLatestResponse(Flow.class); assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle))); assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO))); + + interaction.dispositionSettled(true) + .dispositionRole(Role.RECEIVER) + .dispositionFirstFromLatestDelivery() + .dispositionState(new Accepted()) + .disposition() + .sync(); } - assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java index 337c129..61884e3 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java @@ -591,11 +591,9 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase interaction.doCloseConnection(); - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), - is(equalTo(0))); - } + final String content = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), Matchers.is(Matchers.equalTo(content))); } } @@ -675,11 +673,9 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase interaction.doCloseConnection(); - if (getBrokerAdmin().isQueueDepthSupported()) - { - assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), - Matchers.is(Matchers.equalTo(0))); - } + final String content = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), Matchers.is(Matchers.equalTo(content))); } } diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java index f9640dd..801033e 100644 --- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java +++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java @@ -44,9 +44,14 @@ public abstract class AbstractInteraction<I extends AbstractInteraction<I>> public I consumeResponse(final Class<?>... responseTypes) throws Exception { + final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes)); + return consumeResponse(acceptableResponseClasses); + } + + protected I consumeResponse(final Set<Class<?>> acceptableResponseClasses) throws Exception + { sync(); _latestResponse = getNextResponse(); - final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes)); if ((acceptableResponseClasses.isEmpty() && _latestResponse != null) || (acceptableResponseClasses.contains(null) && _latestResponse == null)) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
