This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 1e416ebf2dc1a91ea3f1dc7332a66ee9de9e8316 Author: Alex Rudyy <[email protected]> AuthorDate: Fri Aug 23 13:28:06 2019 +0100 QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests --- .../qpid/tests/protocol/v1_0/Interaction.java | 20 +++++-- .../anonymousterminus/AnonymousTerminusTest.java | 54 +++++------------- .../protocol/v1_0/messaging/MultiTransferTest.java | 17 +++--- .../protocol/v1_0/messaging/TransferTest.java | 66 ++++++++++------------ .../protocol/v1_0/transaction/DischargeTest.java | 8 +-- .../transaction/TransactionalTransferTest.java | 34 ++++++----- 6 files changed, 87 insertions(+), 112 deletions(-) diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index 8c7709d..57d90d2 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -1054,19 +1054,25 @@ public class Interaction extends AbstractInteraction<Interaction> private DeliveryState handleCoordinatorResponse() throws Exception { final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class)); - if (_coordinatorCredits.decrementAndGet() == 0) { expected.add(Flow.class); } - final Map<Class<?>, ?> responses = consumeResponses(expected); + final Map<Class<?>, ?> responses = consumeResponses(expected, Collections.singleton(Flow.class)); final Disposition disposition = (Disposition) responses.get(Disposition.class); if (expected.contains(Flow.class)) { Flow flow = (Flow) responses.get(Flow.class); - _coordinatorCredits.set(flow.getLinkCredit().longValue()); + if (flow.getHandle().equals(getCoordinatorHandle())) + { + final UnsignedInteger linkCredit = flow.getLinkCredit(); + if (linkCredit != null) + { + _coordinatorCredits.set(linkCredit.longValue()); + } + } } if (!Boolean.TRUE.equals(disposition.getSettled())) { @@ -1075,13 +1081,15 @@ public class Interaction extends AbstractInteraction<Interaction> return disposition.getState(); } - private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes) + private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes, Set<Class<?>> ignore) throws Exception { - Map<Class<?>, Object> results = new HashMap<>(); + final Map<Class<?>, Object> results = new HashMap<>(); + final Set<Class<?>> expected = new HashSet<>(responseTypes); + expected.addAll(ignore); do { - Response<?> response = consumeResponse(responseTypes).getLatestResponse(); + Response<?> response = consumeResponse(expected).getLatestResponse(); if (response != null && response.getBody() instanceof FrameBody) { Class<?> bodyClass = response.getBody().getClass(); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java index 369abf7..10a700c 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java @@ -38,7 +38,6 @@ import org.junit.Before; import org.junit.Test; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.protocol.v1_0.SequenceNumber; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.Symbol; @@ -59,7 +58,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.util.StringUtil; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; @@ -72,7 +70,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase { private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag"); - private static final String TEST_MESSAGE_CONTENT = "test"; + private InetSocketAddress _brokerAddress; private Binary _deliveryTag; @@ -110,8 +108,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transfer() .sync(); - Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); - assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), + is(equalTo(getTestName()))); } } @@ -145,7 +143,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferDeliveryTag(_deliveryTag) .transfer(); - Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class); + final Detach detach = interaction.consume(Detach.class, Flow.class); Error error = detach.getError(); assertThat(error, is(notNullValue())); assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); @@ -183,10 +181,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferDeliveryId() .transferPayload(generateMessagePayloadToDestination(getNonExistingDestinationName())) .transferDeliveryTag(_deliveryTag) - .transfer() - .consumeResponse(); + .transfer(); - Disposition disposition = interaction.getLatestResponse(Disposition.class); + final Disposition disposition = interaction.consume(Disposition.class, Flow.class); assertThat(disposition.getSettled(), is(true)); @@ -233,7 +230,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferDeliveryTag(_deliveryTag) .transfer(); - Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class); + final Detach detach = interaction.consume(Detach.class, Flow.class); Error error = detach.getError(); assertThat(error, is(notNullValue())); assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); @@ -275,7 +272,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); - assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); + assertThat(receivedMessage, is(equalTo(getTestName()))); } } @@ -305,7 +302,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferSettled(Boolean.FALSE) .transfer(); - Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class); + final Disposition disposition = interaction.consume(Disposition.class, Flow.class); assertThat(disposition.getSettled(), is(true)); @@ -320,7 +317,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); - assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); + assertThat(receivedMessage, is(equalTo(getTestName()))); } } @@ -351,7 +348,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferSettled(Boolean.FALSE) .transfer(); - Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class); + final Disposition disposition = interaction.consume(Disposition.class, Flow.class); assertThat(disposition.getSettled(), is(true)); @@ -400,7 +397,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferSettled(Boolean.FALSE) .transfer(); - Detach senderLinkDetach = interaction.consumeResponse().getLatestResponse(Detach.class); + final Detach senderLinkDetach = interaction.consume(Detach.class, Flow.class); Error senderLinkDetachError = senderLinkDetach.getError(); assertThat(senderLinkDetachError, is(notNullValue())); assertThat(senderLinkDetachError.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); @@ -537,7 +534,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferSettled(Boolean.TRUE) .transfer().txnSendDischarge(false); - Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class); + final Detach transactionCoordinatorDetach = interaction.consume(Detach.class, Flow.class); Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError(); assertThat(transactionCoordinatorDetachError, is(notNullValue())); assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK))); @@ -549,29 +546,6 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase return String.format("%sNonExisting%s", getTestName(), new StringUtil().randomAlphaNumericString(10)); } - private Disposition getDispositionForDeliveryId(final Interaction interaction, - final UnsignedInteger deliveryId) throws Exception - { - Disposition dischargeTransactionDisposition = null; - - SequenceNumber id = new SequenceNumber(deliveryId.intValue()); - do - { - Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse(); - if (response.getBody() instanceof Disposition) - { - Disposition disposition = (Disposition) response.getBody(); - UnsignedInteger first = disposition.getFirst(); - UnsignedInteger last = disposition.getLast() == null ? disposition.getFirst() : disposition.getLast(); - if (new SequenceNumber(first.intValue()).compareTo(id) >= 0 && new SequenceNumber(last.intValue()).compareTo(id) <=0) - { - dischargeTransactionDisposition = disposition; - } - } - } while (dischargeTransactionDisposition == null); - return dischargeTransactionDisposition; - } - private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception { final Interaction interaction = transport.newInteraction(); @@ -590,7 +564,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase final Properties properties = new Properties(); properties.setTo(destinationName); messageEncoder.setProperties(properties); - messageEncoder.addData(TEST_MESSAGE_CONTENT); + messageEncoder.addData(getTestName()); return messageEncoder.getPayload(); } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java index a5f7e7c..536d6ab 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java @@ -48,9 +48,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.tests.protocol.ChannelClosedResponse; +import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -100,8 +100,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferMore(false) .transferPayload(payloads[1]) .transfer() - .consumeResponse() - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); for (final QpidByteBuffer payload : payloads) { @@ -158,15 +157,14 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryTag(null) .transferMore(false) .transferPayload(payloads[3]) - .transfer() - .consumeResponse(); - - Disposition disposition = interaction.getLatestResponse(Disposition.class); + .transfer().sync(); for (final QpidByteBuffer payload : payloads) { payload.dispose(); } + + Disposition disposition = interaction.consume(Disposition.class, Flow.class); assertThat(disposition.getFirst(), is(equalTo(deliveryId))); assertThat(disposition.getLast(), oneOf(null, deliveryId)); assertThat(disposition.getSettled(), is(equalTo(true))); @@ -201,8 +199,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliveryId) .transferDeliveryTag(deliveryTag) .transferMore(true) + .transferSettled(true) .transfer() - .sync() .transferPayload(null) .transferMore(null) .transferAborted(true) @@ -304,8 +302,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>(); for (int i = 0; i < 2; i++) { - Disposition disposition = interaction.consumeResponse(Disposition.class) - .getLatestResponse(Disposition.class); + Disposition disposition = interaction.consume(Disposition.class, Flow.class); dispositionMap.put(disposition.getFirst(), disposition); assertThat(disposition.getLast(), oneOf(null, disposition.getFirst())); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index c89898d..733ba94 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -192,8 +192,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayloadData(getTestName()) .transfer() - .consumeResponse() - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class))); @@ -245,7 +244,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .transferDeliveryTag(deliveryTag) .transfer(); - final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class); + final Disposition disposition1 = interaction.consume(Disposition.class, Flow.class); final UnsignedInteger first = disposition1.getFirst(); final UnsignedInteger last = disposition1.getLast(); @@ -254,7 +253,7 @@ public class TransferTest extends BrokerAdminUsingTestBase if (last == null || first.equals(last)) { - final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class); + final Disposition disposition2 = interaction.consume(Disposition.class, Flow.class); assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE))); assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE))); assertThat(disposition2.getFirst(), is(not(equalTo(first)))); @@ -265,7 +264,8 @@ public class TransferTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "2.7.5", - description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first") + description = "If first, this indicates that the receiver MUST settle the delivery once" + + " it has arrived without waiting for the sender to settle first") public void transferReceiverSettleModeFirst() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) @@ -282,8 +282,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .transferPayloadData(getTestName()) .transferRcvSettleMode(ReceiverSettleMode.FIRST) .transfer() - .consumeResponse() - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class))); @@ -585,8 +584,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .dispositionRole(Role.RECEIVER) .dispositionState(new Accepted()) .disposition() - .consumeResponse(Disposition.class) - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); assertThat(disposition.getSettled(), is(true)); interaction.dispositionSettled(true) @@ -689,8 +687,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .dispositionRole(Role.RECEIVER) .dispositionState(null) .disposition() - .consumeResponse(Disposition.class) - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); assertThat(disposition.getSettled(), is(true)); interaction.consumeResponse(null, Flow.class); @@ -836,8 +833,7 @@ public class TransferTest extends BrokerAdminUsingTestBase .consumeResponse(Attach.class) .assertLatestResponse(Attach.class, this::assumeReceiverSettlesSecond) .consumeResponse(Flow.class) - .assertLatestResponse(Flow.class, - flow -> assumeThat(flow.getLinkCredit().intValue(), is(greaterThan(1)))) + .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne) .transferDeliveryId() .transferDeliveryTag(deliveryTag) .transferPayloadData(content1) @@ -898,12 +894,9 @@ public class TransferTest extends BrokerAdminUsingTestBase .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) .attach() .consumeResponse(Attach.class) - .consumeResponse(Flow.class); - - Flow flow = interaction.getLatestResponse(Flow.class); - assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1))); - - interaction.transferDeliveryId(UnsignedInteger.ZERO) + .consumeResponse(Flow.class) + .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne) + .transferDeliveryId(UnsignedInteger.ZERO) .transferDeliveryTag(deliveryTag) .transferPayloadData(contents[0]) .transferSettled(true) @@ -1202,24 +1195,16 @@ public class TransferTest extends BrokerAdminUsingTestBase { do { - Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse(); - if (response.getBody() instanceof Disposition) - { - Disposition disposition = (Disposition) response.getBody(); - LongStream.rangeClosed(disposition.getFirst().longValue(), - disposition.getLast() == null - ? disposition.getFirst().longValue() - : disposition.getLast().longValue()) - .forEach(value -> { - UnsignedInteger deliveryId = expectedDeliveryIds.first(); - assertThat(value, is(equalTo(deliveryId.longValue()))); - expectedDeliveryIds.remove(deliveryId); - }); - } - else if (response.getBody() instanceof Flow) - { - // ignore flows - } + Disposition disposition = interaction.consume(Disposition.class, Flow.class); + LongStream.rangeClosed(disposition.getFirst().longValue(), + disposition.getLast() == null + ? disposition.getFirst().longValue() + : disposition.getLast().longValue()) + .forEach(value -> { + UnsignedInteger deliveryId = expectedDeliveryIds.first(); + assertThat(value, is(equalTo(deliveryId.longValue()))); + expectedDeliveryIds.remove(deliveryId); + }); } while (!expectedDeliveryIds.isEmpty()); } @@ -1243,9 +1228,16 @@ public class TransferTest extends BrokerAdminUsingTestBase private void assumeSufficientCredits(final Flow flow) { + assumeThat(flow.getLinkCredit(), is(notNullValue())); assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO))); } + private void assumeCreditsGreaterThanOne(final Flow flow) + { + assumeThat(flow.getLinkCredit(), is(notNullValue())); + assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ONE))); + } + private void assumeReceiverSettlesSecond(final Attach attach) { assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND))); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java index 2036d1e..a22c8fc 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java @@ -228,11 +228,11 @@ public class DischargeTest extends BrokerAdminUsingTestBase .transferTransactionalStateFromCurrentTransaction() .transferPayloadData(getTestName()) .transferHandle(UnsignedInteger.ONE) - .transfer().consumeResponse(Disposition.class) + .transfer().consume(Disposition.class, Flow.class); - .detachHandle(UnsignedInteger.ONE) - .detach().consumeResponse(Detach.class); - interaction.txnDischarge(false); + interaction.detachHandle(UnsignedInteger.ONE) + .detach().consumeResponse(Detach.class) + .txnDischarge(false); assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java index 6a334dc..0827209 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java @@ -37,6 +37,7 @@ import org.junit.Ignore; import org.junit.Test; import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; @@ -106,8 +107,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .transferPayloadData(getTestName()) .transferTransactionalStateFromCurrentTransaction() .transfer() - .consumeResponse(Disposition.class) - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); @@ -155,8 +155,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .transferPayloadData(getTestName()) .transferTransactionalStateFromCurrentTransaction() .transfer() - .consumeResponse(Disposition.class) - .getLatestResponse(Disposition.class); + .consume(Disposition.class, Flow.class); assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); @@ -242,7 +241,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase final Interaction interaction = transport.newInteraction(); - Response<?> response = interaction.negotiateProtocol().consumeResponse() + ErrorCarryingFrameBody response = interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) @@ -260,10 +259,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .transferPayloadData(getTestName()) .transferTransactionalState(integerToBinary(Integer.MAX_VALUE)) .transfer() - .consumeResponse() - .getLatestResponse(); + .consume(ErrorCarryingFrameBody.class, Flow.class); - assertUnknownTransactionIdError(response); + final Error error = response.getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID)); } } @@ -413,14 +413,17 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery(); assertThat(data, is(equalTo(getTestName()))); - Response<?> response = interaction.dispositionSettled(true) + ErrorCarryingFrameBody response = interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) .dispositionTransactionalState(integerToBinary(Integer.MAX_VALUE), new Accepted()) .dispositionFirst(deliveryId) .disposition() - .consumeResponse().getLatestResponse(); - assertUnknownTransactionIdError(response); + .consume(ErrorCarryingFrameBody.class, Flow.class); + + final Error error = response.getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID)); } finally { @@ -630,7 +633,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - Response<?> response = interaction.negotiateProtocol() + ErrorCarryingFrameBody response = interaction.negotiateProtocol() .consumeResponse() .open() .consumeResponse(Open.class) @@ -653,10 +656,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), integerToBinary(Integer.MAX_VALUE))) .flow() - .consumeResponse() - .getLatestResponse(); + .consume(ErrorCarryingFrameBody.class, Flow.class); - assertUnknownTransactionIdError(response); + final Error error = response.getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID)); } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
