Repository: qpid-broker-j Updated Branches: refs/heads/master 6c0c5b1ff -> 99fa51f01
QPID-8038: [Broker-J] [AMQP 0-x] Add tests related to protocol negotiation and oevrsized frames. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a9e61c16 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a9e61c16 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a9e61c16 Branch: refs/heads/master Commit: a9e61c16b742d266a9b75d54c18c76fcd9341c8a Parents: 6c0c5b1 Author: Keith Wall <[email protected]> Authored: Wed Jan 3 13:31:50 2018 +0000 Committer: Keith Wall <[email protected]> Committed: Thu Jan 4 10:16:39 2018 +0000 ---------------------------------------------------------------------- .../tests/protocol/v0_10/FrameTransport.java | 1 + .../qpid/tests/protocol/v0_10/Interaction.java | 11 +- .../tests/protocol/v0_10/ConnectionTest.java | 30 ---- .../qpid/tests/protocol/v0_10/ProtocolTest.java | 154 +++++++++++++++++++ .../tests/protocol/v0_8/BasicInteraction.java | 20 +++ .../tests/protocol/v0_8/FrameTransport.java | 2 +- .../qpid/tests/protocol/v0_8/Interaction.java | 12 +- .../tests/protocol/v0_8/ConnectionTest.java | 46 +++++- .../qpid/tests/protocol/v0_8/ProtocolTest.java | 154 +++++++++++++++++++ .../qpid/tests/protocol/v1_0/Interaction.java | 1 + .../tests/protocol/AbstractFrameTransport.java | 5 + .../tests/protocol/AbstractInteraction.java | 2 + 12 files changed, 403 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java index 3b7849c..8bf7bc4 100644 --- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java @@ -22,6 +22,7 @@ package org.apache.qpid.tests.protocol.v0_10; import java.net.InetSocketAddress; +import org.apache.qpid.server.protocol.ProtocolVersion; import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10; import org.apache.qpid.tests.protocol.AbstractFrameTransport; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java index 2398636..03deef5 100644 --- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java @@ -37,6 +37,7 @@ import org.apache.qpid.tests.protocol.AbstractInteraction; public class Interaction extends AbstractInteraction<Interaction> { + private byte[] _protocolHeader; private ConnectionInteraction _connectionInteraction; private SessionInteraction _sessionInteraction; private MessageInteraction _messageInteraction; @@ -56,12 +57,20 @@ public class Interaction extends AbstractInteraction<Interaction> _txInteraction = new TxInteraction(this); _queueInteraction = new QueueInteraction(this); _exchangeInteraction = new ExchangeInteraction(this); + _protocolHeader = getTransport().getProtocolHeader(); + } + + @Override + public Interaction protocolHeader(final byte[] header) + { + _protocolHeader = header; + return this; } @Override protected byte[] getProtocolHeader() { - return getTransport().getProtocolHeader(); + return _protocolHeader; } public <T extends Method> Interaction sendPerformative(final T performative) throws Exception http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java index 1072f7c..d273f02 100644 --- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.tests.protocol.v0_10; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; -import org.hamcrest.core.IsEqual; import org.junit.Before; import org.junit.Test; @@ -40,8 +35,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune; import org.apache.qpid.tests.protocol.ChannelClosedResponse; -import org.apache.qpid.tests.protocol.HeaderResponse; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -58,29 +51,6 @@ public class ConnectionTest extends BrokerAdminUsingTestBase } @Test - @SpecificationTest(section = "4.3. Version Negotiation", - description = "When the client opens a new socket connection to an AMQP server," - + " it MUST send a protocol header with the client's preferred protocol version." - + "If the requested protocol version is supported, the server MUST send its own protocol" - + " header with the requested version to the socket, and then implement the protocol accordingly") - public void versionNegotiation() throws Exception - { - try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) - { - final Interaction interaction = transport.newInteraction(); - Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse(); - assertThat(response, is(instanceOf(HeaderResponse.class))); - assertThat(response.getBody(), is(IsEqual.equalTo(transport.getProtocolHeader()))); - - ConnectionStart connectionStart = interaction.consumeResponse().getLatestResponse(ConnectionStart.class); - assertThat(connectionStart.getMechanisms(), is(notNullValue())); - assertThat(connectionStart.getMechanisms(), contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)); - assertThat(connectionStart.getLocales(), is(notNullValue())); - assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE)); - } - } - - @Test @SpecificationTest(section = "9.connection.start-ok", description = "An AMQP client MUST handle incoming connection.start controls.") public void startOk() throws Exception http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java new file mode 100644 index 0000000..35675e1 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertArrayEquals; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +import org.hamcrest.core.IsEqual; +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart; +import org.apache.qpid.tests.protocol.HeaderResponse; +import org.apache.qpid.tests.protocol.Response; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class ProtocolTest extends BrokerAdminUsingTestBase +{ + private static final String DEFAULT_LOCALE = "en_US"; + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "4.3. Version Negotiation", + description = "When the client opens a new socket connection to an AMQP server," + + " it MUST send a protocol header with the client's preferred protocol version." + + "If the requested protocol version is supported, the server MUST send its own protocol" + + " header with the requested version to the socket, and then implement the protocol accordingly") + public void versionNegotiation() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse(); + assertThat(response, is(instanceOf(HeaderResponse.class))); + assertThat(response.getBody(), is(IsEqual.equalTo(transport.getProtocolHeader()))); + + ConnectionStart connectionStart = interaction.consumeResponse().getLatestResponse(ConnectionStart.class); + assertThat(connectionStart.getMechanisms(), is(notNullValue())); + assertThat(connectionStart.getMechanisms(), contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)); + assertThat(connectionStart.getLocales(), is(notNullValue())); + assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE)); + } + } + + @Test + @SpecificationTest(section = "4.3. Version Negotiation", + description = "If the server can't parse the protocol header, the server MUST send a valid protocol " + + "header with a supported protocol version and then close the socket.") + public void unrecognisedProtocolHeader() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + + final Interaction interaction = transport.newInteraction(); + + byte[] unknownHeader = "NOTANAMQPHEADER".getBytes(StandardCharsets.UTF_8); + byte[] expectedResponse = "AMQP\001\001\000\012".getBytes(StandardCharsets.UTF_8); + final byte[] response = interaction.protocolHeader(unknownHeader) + .negotiateProtocol() + .consumeResponse().getLatestResponse(byte[].class); + assertArrayEquals("Unexpected protocol header response", expectedResponse, response); + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "4.3. Version Negotiation", + description = "If the requested protocol version is not supported, the server MUST send a protocol " + + "header with a supported protocol version and then close the socket.") + public void unrecognisedProtocolVersion() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + + + final Interaction interaction = transport.newInteraction(); + + byte[] unknownAmqpVersion = "AMQP\001\001\000\013".getBytes(StandardCharsets.UTF_8); + byte[] expectedResponse = "AMQP\001\001\000\012".getBytes(StandardCharsets.UTF_8); + final byte[] response = interaction.protocolHeader(unknownAmqpVersion) + .negotiateProtocol() + .consumeResponse().getLatestResponse(byte[].class); + assertArrayEquals("Unexpected protocol header response", expectedResponse, response); + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "8. Domains", description = "valid values for the frame type indicator.") + public void invalidSegmentType() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + + interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class); + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) + { + dos.writeByte(0); /* flags */ + dos.writeByte(4); /* segment type - undefined value in 0-10 */ + dos.writeShort(12); /* size */ + dos.writeByte(0); + dos.writeByte(0); /* track */ + dos.writeShort(0); /* channel */ + dos.writeByte(0); + dos.writeByte(0); + dos.writeByte(0); + dos.writeByte(0); + + transport.sendBytes(bos.toByteArray()); + } + transport.flush(); + transport.assertNoMoreResponsesAndChannelClosed(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java index c004e38..718c41d 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; @@ -125,6 +126,25 @@ public class BasicInteraction return this; } + public Interaction contentHeader(int contentSize) throws Exception + { + final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties(); + basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders)); + basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType); + basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode); + basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(basicContentHeaderProperties, contentSize); + return _interaction.sendPerformative(contentHeaderBody); + } + + public Interaction contentBody(final byte[] bytes) throws Exception + { + try (QpidByteBuffer buf = QpidByteBuffer.wrap(bytes)) + { + final ContentBody contentBody = new ContentBody(buf); + return _interaction.sendPerformative(contentBody); + } + } public Interaction publishMessage() throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java index 432eab8..cee4c0f 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java @@ -76,7 +76,7 @@ public class FrameTransport extends AbstractFrameTransport<Interaction> return _protocolHeader; } - ProtocolVersion getProtocolVersion() + public ProtocolVersion getProtocolVersion() { return _protocolVersion; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java index b990eae..5e89af8 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java @@ -30,7 +30,7 @@ import org.apache.qpid.tests.protocol.AbstractInteraction; public class Interaction extends AbstractInteraction<Interaction> { - + private byte[] _protocolHeader; private int _channelId; private int _maximumPayloadSize = 512; private ConnectionInteraction _connectionInteraction; @@ -49,12 +49,20 @@ public class Interaction extends AbstractInteraction<Interaction> _basicInteraction = new BasicInteraction(this); _txInteraction = new TxInteraction(this); _exchangeInteraction = new ExchangeInteraction(this); + _protocolHeader = getTransport().getProtocolHeader(); + } + + @Override + public Interaction protocolHeader(final byte[] header) + { + _protocolHeader = header; + return this; } @Override protected byte[] getProtocolHeader() { - return getTransport().getProtocolHeader(); + return _protocolHeader; } public Interaction sendPerformative(final AMQBody amqBody) throws Exception http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java index fa4a692..9c693a3 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java @@ -29,6 +29,7 @@ import java.net.InetSocketAddress; import org.junit.Before; import org.junit.Test; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody; @@ -153,6 +154,50 @@ public class ConnectionTest extends BrokerAdminUsingTestBase } @Test + @SpecificationTest(section = "4.2.3", + description = "A peer MUST NOT send frames larger than the agreed-upon size. A peer that receives an " + + "oversized frame MUST signal a connection exception with reply code 501 (frame error).") + public void overlySizedContentBodyFrame() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + final long frameMax = response.getFrameMax(); + // Older Qpid JMS Client 0-x had a defect that meant they could send content body frames that were too + // large. Rather then limiting the user content of each frame to frameSize - 8, it sent frameSize bytes + // of user content meaning the resultant frame was too big. The server accommodates this behaviour + // by reducing the frame-size advertised to the client. + final int overlyLargeFrameBodySize = (int) (frameMax + 1); // Should be frameMax - 8 + 1. + final byte[] bodyBytes = new byte[overlyLargeFrameBodySize]; + + interaction.connection() + .tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(frameMax) + .tuneOkHeartbeat(response.getHeartbeat()) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOkBody.class) + .channel().open() + .consumeResponse(ChannelOpenOkBody.class) + .basic().publish() + .basic().contentHeader(bodyBytes.length) + .basic().contentBody(bodyBytes) + .sync(); + + final ChannelClosedResponse closeResponse = interaction.consumeResponse() + .getLatestResponse(ChannelClosedResponse.class); + //TODO: The ChannelClosedResponse is wrong. + //assertThat(res.getReplyCode(), CoreMatchers.is(CoreMatchers.equalTo(ErrorCodes.COMMAND_INVALID))); + } + } + + @Test @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK" + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK") public void authenticationBypassBySendingTuneOk() throws Exception @@ -201,5 +246,4 @@ public class ConnectionTest extends BrokerAdminUsingTestBase .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class); } } - } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java new file mode 100644 index 0000000..8475bb9 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assume.assumeThat; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.ProtocolVersion; +import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; +import org.apache.qpid.server.protocol.v0_8.transport.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody; +import org.apache.qpid.server.transport.ByteBufferSender; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class ProtocolTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "4.2.2", + description = "If the server does not recognise the first 5 octets of data on the socket [...], it MUST " + + "write a valid protocol header to the socket, [...] and then close the socket connection.") + public void unrecognisedProtocolHeader() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + assumeThat(transport.getProtocolVersion(), is(equalTo(ProtocolVersion.v0_91))); + + final Interaction interaction = transport.newInteraction(); + + byte[] unknownHeader = "NOTANAMQPHEADER".getBytes(StandardCharsets.UTF_8); + byte[] expectedResponse = "AMQP\000\000\011\001".getBytes(StandardCharsets.UTF_8); + final byte[] response = interaction.protocolHeader(unknownHeader) + .negotiateProtocol() + .consumeResponse().getLatestResponse(byte[].class); + assertArrayEquals("Unexpected protocol header response", expectedResponse, response); + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "4.2.2", + description = "If the server [...] does not support the specific protocol version that the client " + + "requests, it MUST write a valid protocol header to the socket, [...] and then close " + + "the socket connection.") + public void unrecognisedProtocolVersion() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + assumeThat(transport.getProtocolVersion(), is(equalTo(ProtocolVersion.v0_91))); + + final Interaction interaction = transport.newInteraction(); + + byte[] unknownAmqpVersion = "AMQP\000\000\010\002".getBytes(StandardCharsets.UTF_8); + byte[] expectedResponse = "AMQP\000\000\011\001".getBytes(StandardCharsets.UTF_8); + final byte[] response = interaction.protocolHeader(unknownAmqpVersion) + .negotiateProtocol() + .consumeResponse().getLatestResponse(byte[].class); + assertArrayEquals("Unexpected protocol header response", expectedResponse, response); + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "4.2.2", description = "The server either accepts [...] the protocol header") + public void validProtocolVersion() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + + interaction.negotiateProtocol() + .consumeResponse().getLatestResponse(ConnectionStartBody.class); + + } + } + + @Test + @SpecificationTest(section = "4.2.2", + description = "If a peer receives a frame with a type that is not one of these defined types, it MUST " + + "treat this as a fatal protocol error and close the connection without sending any " + + "further data on it") + public void unrecognisedFrameType() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + + interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .sendPerformative(new AMQBody() + { + @Override + public byte getFrameType() + { + return (byte)5; // Spec defines 1 - 4 only. + } + + @Override + public int getSize() + { + return 0; + } + + @Override + public long writePayload(final ByteBufferSender sender) + { + return 0; + } + + @Override + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + { + throw new UnsupportedOperationException(); + } + }).sync(); + transport.assertNoMoreResponsesAndChannelClosed(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index b2f8147..7a372a7 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -166,6 +166,7 @@ public class Interaction extends AbstractInteraction<Interaction> // Protocol Negotiation // ///////////////////////// + @Override public Interaction protocolHeader(byte[] header) { _protocolHeader = header; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java index cad8415..91b0454 100644 --- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java +++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java @@ -131,6 +131,11 @@ public abstract class AbstractFrameTransport<I extends AbstractInteraction<I>> i ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception { + return sendBytes(bytes); + } + + public ListenableFuture<Void> sendBytes(final byte[] bytes) + { Preconditions.checkState(_channel != null, "Not connected"); ChannelPromise promise = _channel.newPromise(); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java index 4b41ca9..2c977f3 100644 --- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java +++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java @@ -141,6 +141,8 @@ public abstract class AbstractInteraction<I extends AbstractInteraction<I>> return _transport; } + public abstract I protocolHeader(final byte[] header); + protected abstract byte[] getProtocolHeader(); private I getInteraction() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
