This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 605e727da226b503db95d3d2078e6196caf1aab6 Author: Alex Rudyy <[email protected]> AuthorDate: Fri Aug 23 13:39:35 2019 +0100 QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests --- .../qpid/tests/protocol/v1_0/DecodeErrorTest.java | 148 +++++++++------------ .../protocol/v1_0/messaging/MessageFormat.java | 42 ++---- .../protocol/v1_0/messaging/MultiTransferTest.java | 29 +++- .../protocol/v1_0/messaging/TransferTest.java | 120 ++++++++++------- .../v1_0/transport/connection/OpenTest.java | 23 ++-- .../protocol/v1_0/transport/link/AttachTest.java | 33 +++-- .../protocol/v1_0/transport/link/FlowTest.java | 35 +++-- .../v1_0/transport/link/LinkStealingTest.java | 5 +- .../protocol/v1_0/transport/session/BeginTest.java | 17 ++- 9 files changed, 238 insertions(+), 214 deletions(-) diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java index ce604c9..c0aaf45 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java @@ -25,8 +25,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.fail; import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; @@ -40,6 +40,7 @@ import org.junit.Test; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.codec.StringWriter; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; @@ -52,13 +53,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; -import org.apache.qpid.server.protocol.v1_0.type.transport.Close; -import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; -import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.utils.BrokerAdmin; @@ -91,6 +90,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) .attachRole(Role.SENDER) + .attachSndSettleMode(SenderSettleMode.SETTLED) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) .attach() .consumeResponse(Attach.class) @@ -99,31 +99,24 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase flow -> assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO)))); - final List<QpidByteBuffer> payloads = buildInvalidMessage(); - try - { - try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads)) - { - interaction.transferMessageFormat(UnsignedInteger.ZERO) - .transferPayload(combinedPayload) - .transfer(); - } - } - finally + try(final QpidByteBuffer payload = buildInvalidMessage()) { - payloads.forEach(QpidByteBuffer::dispose); + interaction.transferMessageFormat(UnsignedInteger.ZERO) + .transferPayload(payload) + .transfer(); } - final Detach detachResponse = interaction.consumeResponse() - .getLatestResponse(Detach.class); - assertThat(detachResponse.getError(), is(notNullValue())); - assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR))); } + + final String validMessage = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, validMessage); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(validMessage))); } @Test @SpecificationTest(section = "3.5.9", - description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.") + description = "Node Properties [...] lifetime-policy [...] " + + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.") public void nodePropertiesLifetimePolicy() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) @@ -143,24 +136,10 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase assertThat(latestResponse, is(notNullValue())); final Object responseBody = latestResponse.getBody(); - final Error error; - if (responseBody instanceof End) - { - error = ((End) responseBody).getError(); - } - else if (responseBody instanceof Close) - { - error = ((Close) responseBody).getError(); - } - else if (responseBody instanceof Detach) - { - error = ((Detach) responseBody).getError(); - } - else - { - fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody)); - error = null; - } + assertThat(responseBody, is(notNullValue())); + assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class)); + + final Error error = ((ErrorCarryingFrameBody) responseBody).getError(); assertThat(error, is(notNullValue())); assertThat(error.getCondition(), is(equalTo(DECODE_ERROR))); @@ -169,7 +148,8 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "3.5.9", - description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.") + description = "Node Properties [...] supported-dist-modes [...] " + + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.") public void nodePropertiesSupportedDistributionModes() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) @@ -189,70 +169,66 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase assertThat(latestResponse, is(notNullValue())); final Object responseBody = latestResponse.getBody(); - final Error error; - if (responseBody instanceof End) - { - error = ((End) responseBody).getError(); - } - else if (responseBody instanceof Close) - { - error = ((Close) responseBody).getError(); - } - else - { - fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody)); - error = null; - } + assertThat(responseBody, is(notNullValue())); + assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class)); + final Error error = ((ErrorCarryingFrameBody) responseBody).getError(); assertThat(error, is(notNullValue())); assertThat(error.getCondition(), is(equalTo(DECODE_ERROR))); } } - private List<QpidByteBuffer> buildInvalidMessage() + private QpidByteBuffer buildInvalidMessage() { final List<QpidByteBuffer> payloads = new ArrayList<>(); - final Header header = new Header(); - header.setTtl(UnsignedInteger.valueOf(1000L)); - final HeaderSection headerSection = header.createEncodingRetainingSection(); try { - payloads.add(headerSection.getEncodedForm()); - } - finally - { - headerSection.dispose(); - } + final Header header = new Header(); + header.setTtl(UnsignedInteger.valueOf(10000L)); + final HeaderSection headerSection = header.createEncodingRetainingSection(); + try + { + payloads.add(headerSection.getEncodedForm()); + } + finally + { + headerSection.dispose(); + } - final StringWriter stringWriter = new StringWriter("string in between annotation sections"); - QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize()); - stringWriter.writeToBuffer(encodedString); - encodedString.flip(); - payloads.add(encodedString); + final StringWriter stringWriter = new StringWriter("string in between message sections"); + final QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize()); + stringWriter.writeToBuffer(encodedString); + encodedString.flip(); + payloads.add(encodedString); - final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar"); - final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap); - final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection(); - try - { + final Map<Symbol, Object> annotationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar"); + final DeliveryAnnotations annotations = new DeliveryAnnotations(annotationMap); + final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection(); + try + { + payloads.add(deliveryAnnotationsSection.getEncodedForm()); + } + finally + { + deliveryAnnotationsSection.dispose(); + } - payloads.add(deliveryAnnotationsSection.getEncodedForm()); - } - finally - { - deliveryAnnotationsSection.dispose(); - } + final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection(); + try + { + payloads.add(payload.getEncodedForm()); + } + finally + { + payload.dispose(); + } - final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection(); - try - { - payloads.add(payload.getEncodedForm()); + return QpidByteBuffer.concatenate(payloads); } finally { - payload.dispose(); + payloads.forEach(QpidByteBuffer::dispose); } - return payloads; } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java index 6ac8e7f..5539291 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java @@ -20,10 +20,9 @@ package org.apache.qpid.tests.protocol.v1_0.messaging; -import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.fail; import java.net.InetSocketAddress; @@ -34,18 +33,13 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; -import org.apache.qpid.server.protocol.v1_0.type.transport.Close; -import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; -import org.apache.qpid.server.protocol.v1_0.type.transport.End; -import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Utils; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -72,7 +66,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase { QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2); - final Response<?> latestResponse = transport.newInteraction() + transport.newInteraction() .negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) @@ -83,6 +77,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase .transferMore(true) .transferMessageFormat(UnsignedInteger.ZERO) .transferPayload(payloads[0]) + .transferSettled(true) .transfer() .consumeResponse(null, Flow.class, Disposition.class) .transferDeliveryTag(null) @@ -91,35 +86,16 @@ public class MessageFormat extends BrokerAdminUsingTestBase .transferMessageFormat(UnsignedInteger.ONE) .transferPayload(payloads[1]) .transfer() - .consumeResponse(Detach.class, End.class, Close.class) - .getLatestResponse(); + .sync(); for (final QpidByteBuffer payload : payloads) { payload.dispose(); } - assertThat(latestResponse, is(notNullValue())); - final Object responseBody = latestResponse.getBody(); - final Error error; - if (responseBody instanceof Detach) - { - error = ((Detach) responseBody).getError(); - } - else if (responseBody instanceof End) - { - error = ((End) responseBody).getError(); - } - else if (responseBody instanceof Close) - { - error = ((Close) responseBody).getError(); - } - else - { - fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody)); - error = null; - } - - assertThat(error, is(notNullValue())); } + + final String testMessage = getTestName() + "_2"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessage); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(testMessage))); } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java index 536d6ab..766afe5 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java @@ -210,7 +210,6 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase { payload.dispose(); } - interaction.consumeResponse(null, Flow.class); } String secondMessage = getTestName() + "_2"; Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage); @@ -320,7 +319,12 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "2.6.14", - description = "[...]messages transferred along a single link MUST NOT be interleaved") + description = "For messages that are too large to fit within the maximum frame size," + + " additional data MAY be transferred in additional transfer frames by setting" + + " the more flag on all but the last transfer frame." + + " When a message is split up into multiple transfer frames in this manner," + + " messages being transferred along different links MAY be interleaved." + + " However, messages transferred along a single link MUST NOT be interleaved.") public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception { String messageContent1 = getTestName() + "_1"; @@ -351,14 +355,28 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryTag(deliveryTag1) .transferMore(true) .transferPayload(messagePayload1[0]) + .transferSettled(true) .transfer() - .sync() .transferDeliveryId(deliveryId2) .transferDeliveryTag(deliveryTag2) .transferMore(true) + .transferSettled(true) .transferPayload(messagePayload2[0]) .transfer() + + .transferDeliveryId(deliverId1) + .transferDeliveryTag(deliveryTag1) + .transferMore(false) + .transferPayload(messagePayload1[1]) + .transfer() + + .transferDeliveryId(deliveryId2) + .transferDeliveryTag(deliveryTag2) + .transferMore(false) + .transferPayload(messagePayload2[1]) + .transfer() + .sync(); for (final QpidByteBuffer payload : messagePayload1) { @@ -369,7 +387,10 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase payload.dispose(); } - interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class); } + + final String controlMessage = getTestName() + "_Control"; + Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, controlMessage); + assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(controlMessage))); } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 733ba94..6332ce4 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -68,7 +68,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; -import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError; @@ -77,7 +76,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; -import org.apache.qpid.tests.protocol.ChannelClosedResponse; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; @@ -106,7 +104,7 @@ public class TransferTest extends BrokerAdminUsingTestBase @Test @SpecificationTest(section = "1.3.4", description = "mandatory [...] a non null value for the field is always encoded.") - public void emptyTransfer() throws Exception + public void transferHandleUnspecified() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { @@ -123,25 +121,43 @@ public class TransferTest extends BrokerAdminUsingTestBase .consumeResponse() .getLatestResponse(); + assertThat(response, is(notNullValue())); assertThat(response.getBody(), is(notNullValue())); + assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class))); - if (response.getBody() instanceof Close) - { - final Close responseClose = (Close)response.getBody(); - assertThat(responseClose.getError(), is(notNullValue())); - assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); + } + } - interact.close().sync(); - } - else if (response.getBody() instanceof End) - { - final End responseEnd = (End)response.getBody(); - assertThat(responseEnd.getError(), is(notNullValue())); - assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + @Test + @SpecificationTest(section = "2.7.5", + description = "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery.") + public void transferDeliveryIdUnspecified() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + Interaction interact = transport.newInteraction(); + Response<?> response = interact.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class) + .assertLatestResponse(Flow.class, this::assumeSufficientCredits) + .transferDeliveryId(null) + .transfer() + .consumeResponse() + .getLatestResponse(); - interact.end().doCloseConnection(); - } - transport.assertNoMoreResponses(); + assertThat(response, is(notNullValue())); + assertThat(response.getBody(), is(notNullValue())); + assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class))); + + final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); } } @@ -149,30 +165,40 @@ public class TransferTest extends BrokerAdminUsingTestBase @SpecificationTest(section = "2.7.5", description = "[delivery-tag] MUST be specified for the first transfer " + "[...] and can only be omitted for continuation transfers.") - public void transferWithoutDeliveryTag() throws Exception + public void transferDeliveryTagUnspecified() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { Interaction interaction = transport.newInteraction() - .negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .attachRole(Role.SENDER) - .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) - .attach().consumeResponse(Attach.class) - .consumeResponse(Flow.class) - .assertLatestResponse(Flow.class, this::assumeSufficientCredits) - .transferDeliveryId() - .transferDeliveryTag(null) - .transferPayloadData(getTestName()) - .transfer(); - interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class); + .negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class) + .assertLatestResponse(Flow.class, this::assumeSufficientCredits) + .transferDeliveryId() + .transferDeliveryTag(null) + .transferPayloadData(getTestName()) + .transfer() + .consumeResponse(); + + final Response<?> response = interaction.getLatestResponse(); + assertThat(response, is(notNullValue())); + assertThat(response.getBody(), is(notNullValue())); + assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class))); + + final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); } } @Test - @SpecificationTest(section = "2.6.12", - description = "Transferring A Message.") + @SpecificationTest(section = "2.6.12 Transferring A Message", + description = "[...] the receiving application chooses to settle immediately upon processing the message" + + " rather than waiting for the sender to settle first, that yields an at-least-once guarantee.") public void transferUnsettled() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) @@ -319,27 +345,19 @@ public class TransferTest extends BrokerAdminUsingTestBase .consumeResponse() .getLatestResponse(); - if (response.getBody() instanceof Detach) - { - final Detach detach = (Detach) response.getBody(); - Error error = detach.getError(); - assertThat(error, is(notNullValue())); - assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD))); - } - else - { - if (response.getBody() instanceof Disposition) - { - // clean up - Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); - } - fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'"); - } + assertThat(response, is(notNullValue())); + assertThat(response.getBody(), is(notNullValue())); + assertThat(response.getBody(), is(instanceOf(Detach.class))); + + final Detach detach = (Detach) response.getBody(); + Error error = detach.getError(); + assertThat(error, is(notNullValue())); + assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD))); } } @Test - @SpecificationTest(section = "", description = "Pipelined message send") + @SpecificationTest(section = "2.6.12 Transferring A Message", description = "Pipelined message send") public void presettledPipelined() throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java index 17fdab0..f0f46f4 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -74,7 +75,7 @@ public class OpenTest extends BrokerAdminUsingTestBase Error error = responseClose.getError(); if (error != null) { - assertThat(error.getCondition(), equalTo(AmqpError.DECODE_ERROR)); + assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); } } } @@ -89,16 +90,18 @@ public class OpenTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(addr).connect()) { Interaction interaction = transport.newInteraction(); - Open responseOpen = interaction - .negotiateProtocol().consumeResponse() - .openContainerId("testContainerId") - .open().consumeResponse() - .getLatestResponse(Open.class); + final Open responseOpen = interaction.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .open().consumeResponse() + .getLatestResponse(Open.class); + assertThat(responseOpen.getContainerId(), is(notNullValue())); - assertThat(responseOpen.getMaxFrameSize().longValue(), - is(both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE.longValue())))); - assertThat(responseOpen.getChannelMax().intValue(), - is(both(greaterThanOrEqualTo(0)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE.intValue())))); + assertThat(responseOpen.getMaxFrameSize(), + is(anyOf(nullValue(), + both(greaterThan(UnsignedInteger.ZERO)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE))))); + assertThat(responseOpen.getChannelMax(), + is(anyOf(nullValue(), + both(greaterThanOrEqualTo(UnsignedShort.ZERO)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE))))); interaction.doCloseConnection(); } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java index 75f5985..a170e7e 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java @@ -19,10 +19,12 @@ package org.apache.qpid.tests.protocol.v1_0.transport.link; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; @@ -32,6 +34,7 @@ import java.net.InetSocketAddress; import org.junit.Test; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; @@ -39,8 +42,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.End; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.SpecificationTest; @@ -58,17 +63,23 @@ public class AttachTest extends BrokerAdminUsingTestBase final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); try (FrameTransport transport = new FrameTransport(addr).connect()) { - Close responseClose = transport.newInteraction() - .negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .attachRole(null) - .attachHandle(null) - .attachName(null) - .attach().consumeResponse() - .getLatestResponse(Close.class); - assertThat(responseClose.getError(), is(notNullValue())); - assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + final Response<?> response = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(null) + .attachHandle(null) + .attachName(null) + .attach().consumeResponse() + .getLatestResponse(); + assertThat(response.getBody(), is(notNullValue())); + assertThat(response.getBody(), instanceOf(ErrorCarryingFrameBody.class)); + final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError(); + if (error != null) + { + assertThat(error.getCondition(), + anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); + } } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java index 448de42..1f9968b 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java @@ -20,6 +20,7 @@ package org.apache.qpid.tests.protocol.v1_0.transport.link; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; @@ -32,13 +33,14 @@ import java.net.InetSocketAddress; import org.junit.Test; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; -import org.apache.qpid.server.protocol.v1_0.type.transport.Close; import org.apache.qpid.server.protocol.v1_0.type.transport.End; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; @@ -62,19 +64,24 @@ public class FlowTest extends BrokerAdminUsingTestBase final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); try (FrameTransport transport = new FrameTransport(addr).connect()) { - Close responseClose = transport.newInteraction() - .negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .flowIncomingWindow(null) - .flowNextIncomingId(null) - .flowOutgoingWindow(null) - .flowNextOutgoingId(null) - .flow() - .consumeResponse(Close.class) - .getLatestResponse(Close.class); - assertThat(responseClose.getError(), is(notNullValue())); - assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR)); + final Response<?> response = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .flowIncomingWindow(null) + .flowNextIncomingId(null) + .flowOutgoingWindow(null) + .flowNextOutgoingId(null) + .flow() + .consumeResponse() + .getLatestResponse(); + assertThat(response, is(notNullValue())); + assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class))); + final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError(); + if (error != null) + { + assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); + } } } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java index 260a3bc..fabcfa9 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java @@ -47,6 +47,7 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.utils.BrokerSpecific; public class LinkStealingTest extends BrokerAdminUsingTestBase { @@ -94,7 +95,9 @@ public class LinkStealingTest extends BrokerAdminUsingTestBase @Test - @SpecificationTest(section = "2.6.1. Naming a link", description = "") + @SpecificationTest(section = "2.6.1. Naming a link", + description = "Qpid Broker J extended stolen behaviour on sessions") + @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J) public void subsequentAttachOnDifferentSessions() throws Exception { getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java index f25d275..a1ea481 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java @@ -19,6 +19,7 @@ package org.apache.qpid.tests.protocol.v1_0.transport.session; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -31,13 +32,16 @@ import java.net.InetSocketAddress; import org.junit.Test; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; @@ -54,16 +58,21 @@ public class BeginTest extends BrokerAdminUsingTestBase final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); try(FrameTransport transport = new FrameTransport(addr).connect()) { - Close responseClose = transport.newInteraction() + final Response<?> response = transport.newInteraction() .negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .beginNextOutgoingId(null) .beginIncomingWindow(null) .beginOutgoingWindow(null) .begin().consumeResponse() - .getLatestResponse(Close.class); - assumeThat(responseClose.getError(), is(notNullValue())); - assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR)); + .getLatestResponse(); + assertThat(response, is(notNullValue())); + assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class))); + final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError(); + if (error != null) + { + assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD))); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
