Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 6c0c5b1ff -> 99fa51f01


QPID-8038: [Broker-J] [AMQP 0-x] Add tests related to protocol negotiation and 
oevrsized frames.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a9e61c16
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a9e61c16
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a9e61c16

Branch: refs/heads/master
Commit: a9e61c16b742d266a9b75d54c18c76fcd9341c8a
Parents: 6c0c5b1
Author: Keith Wall <[email protected]>
Authored: Wed Jan 3 13:31:50 2018 +0000
Committer: Keith Wall <[email protected]>
Committed: Thu Jan 4 10:16:39 2018 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_10/FrameTransport.java    |   1 +
 .../qpid/tests/protocol/v0_10/Interaction.java  |  11 +-
 .../tests/protocol/v0_10/ConnectionTest.java    |  30 ----
 .../qpid/tests/protocol/v0_10/ProtocolTest.java | 154 +++++++++++++++++++
 .../tests/protocol/v0_8/BasicInteraction.java   |  20 +++
 .../tests/protocol/v0_8/FrameTransport.java     |   2 +-
 .../qpid/tests/protocol/v0_8/Interaction.java   |  12 +-
 .../tests/protocol/v0_8/ConnectionTest.java     |  46 +++++-
 .../qpid/tests/protocol/v0_8/ProtocolTest.java  | 154 +++++++++++++++++++
 .../qpid/tests/protocol/v1_0/Interaction.java   |   1 +
 .../tests/protocol/AbstractFrameTransport.java  |   5 +
 .../tests/protocol/AbstractInteraction.java     |   2 +
 12 files changed, 403 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
index 3b7849c..8bf7bc4 100644
--- 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
@@ -22,6 +22,7 @@ package org.apache.qpid.tests.protocol.v0_10;
 
 import java.net.InetSocketAddress;
 
+import org.apache.qpid.server.protocol.ProtocolVersion;
 import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10;
 import org.apache.qpid.tests.protocol.AbstractFrameTransport;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index 2398636..03deef5 100644
--- 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -37,6 +37,7 @@ import org.apache.qpid.tests.protocol.AbstractInteraction;
 
 public class Interaction extends AbstractInteraction<Interaction>
 {
+    private byte[] _protocolHeader;
     private ConnectionInteraction _connectionInteraction;
     private SessionInteraction _sessionInteraction;
     private MessageInteraction _messageInteraction;
@@ -56,12 +57,20 @@ public class Interaction extends 
AbstractInteraction<Interaction>
         _txInteraction = new TxInteraction(this);
         _queueInteraction = new QueueInteraction(this);
         _exchangeInteraction = new ExchangeInteraction(this);
+        _protocolHeader = getTransport().getProtocolHeader();
+    }
+
+    @Override
+    public Interaction protocolHeader(final byte[] header)
+    {
+        _protocolHeader = header;
+        return this;
     }
 
     @Override
     protected byte[] getProtocolHeader()
     {
-        return getTransport().getProtocolHeader();
+        return _protocolHeader;
     }
 
     public <T extends Method> Interaction sendPerformative(final T 
performative) throws Exception

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
index 1072f7c..d273f02 100644
--- 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
+++ 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -20,17 +20,12 @@
  */
 package org.apache.qpid.tests.protocol.v0_10;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 
-import org.hamcrest.core.IsEqual;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,8 +35,6 @@ import 
org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
 import org.apache.qpid.tests.protocol.ChannelClosedResponse;
-import org.apache.qpid.tests.protocol.HeaderResponse;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -58,29 +51,6 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
     }
 
     @Test
-    @SpecificationTest(section = "4.3. Version Negotiation",
-            description = "When the client opens a new socket connection to an 
AMQP server,"
-                          + " it MUST send a protocol header with the client's 
preferred protocol version."
-                          + "If the requested protocol version is supported, 
the server MUST send its own protocol"
-                          + " header with the requested version to the socket, 
and then implement the protocol accordingly")
-    public void versionNegotiation() throws Exception
-    {
-        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
-        {
-            final Interaction interaction = transport.newInteraction();
-            Response<?> response = 
interaction.negotiateProtocol().consumeResponse().getLatestResponse();
-            assertThat(response, is(instanceOf(HeaderResponse.class)));
-            assertThat(response.getBody(), 
is(IsEqual.equalTo(transport.getProtocolHeader())));
-
-            ConnectionStart connectionStart = 
interaction.consumeResponse().getLatestResponse(ConnectionStart.class);
-            assertThat(connectionStart.getMechanisms(), is(notNullValue()));
-            assertThat(connectionStart.getMechanisms(), 
contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS));
-            assertThat(connectionStart.getLocales(), is(notNullValue()));
-            assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE));
-        }
-    }
-
-    @Test
     @SpecificationTest(section = "9.connection.start-ok",
             description = "An AMQP client MUST handle incoming 
connection.start controls.")
     public void startOk() throws Exception

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
new file mode 100644
index 0000000..35675e1
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ProtocolTest extends BrokerAdminUsingTestBase
+{
+    private static final String DEFAULT_LOCALE = "en_US";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3. Version Negotiation",
+            description = "When the client opens a new socket connection to an 
AMQP server,"
+                          + " it MUST send a protocol header with the client's 
preferred protocol version."
+                          + "If the requested protocol version is supported, 
the server MUST send its own protocol"
+                          + " header with the requested version to the socket, 
and then implement the protocol accordingly")
+    public void versionNegotiation() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            Response<?> response = 
interaction.negotiateProtocol().consumeResponse().getLatestResponse();
+            assertThat(response, is(instanceOf(HeaderResponse.class)));
+            assertThat(response.getBody(), 
is(IsEqual.equalTo(transport.getProtocolHeader())));
+
+            ConnectionStart connectionStart = 
interaction.consumeResponse().getLatestResponse(ConnectionStart.class);
+            assertThat(connectionStart.getMechanisms(), is(notNullValue()));
+            assertThat(connectionStart.getMechanisms(), 
contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS));
+            assertThat(connectionStart.getLocales(), is(notNullValue()));
+            assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3. Version Negotiation",
+            description = "If the server can't parse the protocol header, the 
server MUST send a valid protocol "
+                          + "header with a supported protocol version and then 
close the socket.")
+    public void unrecognisedProtocolHeader() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+
+            final Interaction interaction = transport.newInteraction();
+
+            byte[] unknownHeader = 
"NOTANAMQPHEADER".getBytes(StandardCharsets.UTF_8);
+            byte[] expectedResponse = 
"AMQP\001\001\000\012".getBytes(StandardCharsets.UTF_8);
+            final byte[] response = interaction.protocolHeader(unknownHeader)
+                                               .negotiateProtocol()
+                                               
.consumeResponse().getLatestResponse(byte[].class);
+            assertArrayEquals("Unexpected protocol header response", 
expectedResponse, response);
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3. Version Negotiation",
+            description = "If the requested protocol version is not supported, 
the server MUST send a protocol "
+                          + "header with a supported protocol version and then 
close the socket.")
+    public void unrecognisedProtocolVersion() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+
+
+            final Interaction interaction = transport.newInteraction();
+
+            byte[] unknownAmqpVersion = 
"AMQP\001\001\000\013".getBytes(StandardCharsets.UTF_8);
+            byte[] expectedResponse = 
"AMQP\001\001\000\012".getBytes(StandardCharsets.UTF_8);
+            final byte[] response = 
interaction.protocolHeader(unknownAmqpVersion)
+                                               .negotiateProtocol()
+                                               
.consumeResponse().getLatestResponse(byte[].class);
+            assertArrayEquals("Unexpected protocol header response", 
expectedResponse, response);
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "8. Domains", description = "valid values for 
the frame type indicator.")
+    public void invalidSegmentType() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class);
+
+            try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                 DataOutputStream dos = new DataOutputStream(bos))
+            {
+                dos.writeByte(0);   /* flags */
+                dos.writeByte(4);   /* segment type - undefined value in 0-10 
*/
+                dos.writeShort(12); /* size */
+                dos.writeByte(0);
+                dos.writeByte(0);   /* track */
+                dos.writeShort(0);  /* channel */
+                dos.writeByte(0);
+                dos.writeByte(0);
+                dos.writeByte(0);
+                dos.writeByte(0);
+
+                transport.sendBytes(bos.toByteArray());
+            }
+            transport.flush();
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index c004e38..718c41d 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
@@ -125,6 +126,25 @@ public class BasicInteraction
         return this;
     }
 
+    public Interaction contentHeader(int contentSize) throws Exception
+    {
+        final BasicContentHeaderProperties basicContentHeaderProperties = new 
BasicContentHeaderProperties();
+        
basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+        
basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
+        
basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
+        
basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
+        ContentHeaderBody contentHeaderBody = new 
ContentHeaderBody(basicContentHeaderProperties, contentSize);
+        return _interaction.sendPerformative(contentHeaderBody);
+    }
+
+    public Interaction contentBody(final byte[] bytes) throws Exception
+    {
+        try (QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            final ContentBody contentBody = new ContentBody(buf);
+            return _interaction.sendPerformative(contentBody);
+        }
+    }
 
     public Interaction publishMessage() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
index 432eab8..cee4c0f 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -76,7 +76,7 @@ public class FrameTransport extends 
AbstractFrameTransport<Interaction>
         return _protocolHeader;
     }
 
-    ProtocolVersion getProtocolVersion()
+    public ProtocolVersion getProtocolVersion()
     {
         return _protocolVersion;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index b990eae..5e89af8 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -30,7 +30,7 @@ import org.apache.qpid.tests.protocol.AbstractInteraction;
 
 public class Interaction extends AbstractInteraction<Interaction>
 {
-
+    private byte[] _protocolHeader;
     private int _channelId;
     private int _maximumPayloadSize = 512;
     private ConnectionInteraction _connectionInteraction;
@@ -49,12 +49,20 @@ public class Interaction extends 
AbstractInteraction<Interaction>
         _basicInteraction = new BasicInteraction(this);
         _txInteraction = new TxInteraction(this);
         _exchangeInteraction = new ExchangeInteraction(this);
+        _protocolHeader = getTransport().getProtocolHeader();
+    }
+
+    @Override
+    public Interaction protocolHeader(final byte[] header)
+    {
+        _protocolHeader = header;
+        return this;
     }
 
     @Override
     protected byte[] getProtocolHeader()
     {
-        return getTransport().getProtocolHeader();
+        return _protocolHeader;
     }
 
     public Interaction sendPerformative(final AMQBody amqBody) throws Exception

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
index fa4a692..9c693a3 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
@@ -153,6 +154,50 @@ public class ConnectionTest extends 
BrokerAdminUsingTestBase
     }
 
     @Test
+    @SpecificationTest(section = "4.2.3",
+            description = "A peer MUST NOT send frames larger than the 
agreed-upon size. A peer that receives an "
+                          + "oversized frame MUST signal a connection 
exception with reply code 501 (frame error).")
+    public void overlySizedContentBodyFrame() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     
.consumeResponse(ConnectionStartBody.class)
+                                                     
.connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     
.consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            final long frameMax = response.getFrameMax();
+            // Older Qpid JMS Client 0-x had a defect that meant they could 
send content body frames that were too
+            // large.  Rather then limiting the user content of each frame to 
frameSize - 8, it sent frameSize bytes
+            // of user content meaning the resultant frame was too big.  The 
server accommodates this behaviour
+            // by reducing the frame-size advertised to the client.
+            final int overlyLargeFrameBodySize = (int) (frameMax + 1);  // 
Should be frameMax - 8 + 1.
+            final byte[] bodyBytes = new byte[overlyLargeFrameBodySize];
+
+            interaction.connection()
+                       .tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(frameMax)
+                       .tuneOkHeartbeat(response.getHeartbeat())
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionOpenOkBody.class)
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .basic().publish()
+                       .basic().contentHeader(bodyBytes.length)
+                       .basic().contentBody(bodyBytes)
+                       .sync();
+
+            final ChannelClosedResponse closeResponse = 
interaction.consumeResponse()
+                                                                   
.getLatestResponse(ChannelClosedResponse.class);
+            //TODO: The ChannelClosedResponse is wrong.
+            //assertThat(res.getReplyCode(), 
CoreMatchers.is(CoreMatchers.equalTo(ErrorCodes.COMMAND_INVALID)));
+        }
+    }
+
+    @Test
     @SpecificationTest(section = "1.4.", description = "open connection = 
C:protocolheader S:START C:START OK"
                                                        + " *challenge S:TUNE 
C:TUNE OK C:OPEN S:OPEN OK")
     public void authenticationBypassBySendingTuneOk() throws Exception
@@ -201,5 +246,4 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
                        .consumeResponse(ConnectionCloseBody.class, 
ChannelClosedResponse.class);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
new file mode 100644
index 0000000..8475bb9
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import 
org.apache.qpid.server.protocol.v0_8.transport.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ProtocolTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "4.2.2",
+            description = "If the server does not recognise the first 5 octets 
of data on the socket [...], it MUST "
+                          + "write a valid protocol header to the socket, 
[...] and then close the socket connection.")
+    public void unrecognisedProtocolHeader() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            assumeThat(transport.getProtocolVersion(), 
is(equalTo(ProtocolVersion.v0_91)));
+
+            final Interaction interaction = transport.newInteraction();
+
+            byte[] unknownHeader = 
"NOTANAMQPHEADER".getBytes(StandardCharsets.UTF_8);
+            byte[] expectedResponse = 
"AMQP\000\000\011\001".getBytes(StandardCharsets.UTF_8);
+            final byte[] response = interaction.protocolHeader(unknownHeader)
+                                               .negotiateProtocol()
+                                               
.consumeResponse().getLatestResponse(byte[].class);
+            assertArrayEquals("Unexpected protocol header response", 
expectedResponse, response);
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.2.2",
+            description = "If the server [...] does not support the specific 
protocol version that the client "
+                          + "requests, it MUST write a valid protocol header 
to the socket, [...] and then close "
+                          + "the socket connection.")
+    public void unrecognisedProtocolVersion() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            assumeThat(transport.getProtocolVersion(), 
is(equalTo(ProtocolVersion.v0_91)));
+
+            final Interaction interaction = transport.newInteraction();
+
+            byte[] unknownAmqpVersion = 
"AMQP\000\000\010\002".getBytes(StandardCharsets.UTF_8);
+            byte[] expectedResponse = 
"AMQP\000\000\011\001".getBytes(StandardCharsets.UTF_8);
+            final byte[] response = 
interaction.protocolHeader(unknownAmqpVersion)
+                                               .negotiateProtocol()
+                                               
.consumeResponse().getLatestResponse(byte[].class);
+            assertArrayEquals("Unexpected protocol header response", 
expectedResponse, response);
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.2.2", description = "The server either 
accepts [...] the protocol header")
+    public void validProtocolVersion() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+            interaction.negotiateProtocol()
+                       
.consumeResponse().getLatestResponse(ConnectionStartBody.class);
+
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.2.2",
+            description = "If a peer receives a frame with a type that is not 
one of these defined types, it MUST "
+                          + "treat this as a fatal protocol error and close 
the connection without sending any "
+                          + "further data on it")
+    public void unrecognisedFrameType() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+            interaction.negotiateProtocol()
+                       .consumeResponse(ConnectionStartBody.class)
+                       .sendPerformative(new AMQBody()
+                       {
+                           @Override
+                           public byte getFrameType()
+                           {
+                               return (byte)5;  // Spec defines 1 - 4 only.
+                           }
+
+                           @Override
+                           public int getSize()
+                           {
+                               return 0;
+                           }
+
+                           @Override
+                           public long writePayload(final ByteBufferSender 
sender)
+                           {
+                               return 0;
+                           }
+
+                           @Override
+                           public void handle(final int channelId, final 
AMQVersionAwareProtocolSession session)
+                           {
+                               throw new UnsupportedOperationException();
+                           }
+                       }).sync();
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index b2f8147..7a372a7 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -166,6 +166,7 @@ public class Interaction extends 
AbstractInteraction<Interaction>
     // Protocol Negotiation //
     /////////////////////////
 
+    @Override
     public Interaction protocolHeader(byte[] header)
     {
         _protocolHeader = header;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
 
b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
index cad8415..91b0454 100644
--- 
a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
+++ 
b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
@@ -131,6 +131,11 @@ public abstract class AbstractFrameTransport<I extends 
AbstractInteraction<I>> i
 
     ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws 
Exception
     {
+        return sendBytes(bytes);
+    }
+
+    public ListenableFuture<Void> sendBytes(final byte[] bytes)
+    {
         Preconditions.checkState(_channel != null, "Not connected");
         ChannelPromise promise = _channel.newPromise();
         ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9e61c16/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
 
b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
index 4b41ca9..2c977f3 100644
--- 
a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
+++ 
b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -141,6 +141,8 @@ public abstract class AbstractInteraction<I extends 
AbstractInteraction<I>>
         return _transport;
     }
 
+    public abstract I protocolHeader(final byte[] header);
+
     protected abstract byte[] getProtocolHeader();
 
     private I getInteraction()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to