QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10

Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ff2980e2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ff2980e2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ff2980e2

Branch: refs/heads/master
Commit: ff2980e2d6e9520ba204acd41a78e9ee412a2c11
Parents: 612c2cb
Author: Alex Rudyy <oru...@apache.org>
Authored: Tue Nov 21 17:16:42 2017 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Wed Nov 22 15:02:23 2017 +0000

----------------------------------------------------------------------
 pom.xml                                         |  47 ++--
 systests/protocol-tests-amqp-0-10/pom.xml       | 109 +++++++
 .../qpid/tests/protocol/v0_10/Assembler.java    | 264 +++++++++++++++++
 .../protocol/v0_10/ConnectionInteraction.java   |  83 ++++++
 .../qpid/tests/protocol/v0_10/Disassembler.java | 281 +++++++++++++++++++
 .../tests/protocol/v0_10/ErrorResponse.java     |  40 +++
 .../protocol/v0_10/ExecutionInteraction.java    |  47 ++++
 .../qpid/tests/protocol/v0_10/FrameDecoder.java | 190 +++++++++++++
 .../qpid/tests/protocol/v0_10/FrameEncoder.java |  87 ++++++
 .../tests/protocol/v0_10/FrameTransport.java    |  58 ++++
 .../qpid/tests/protocol/v0_10/Interaction.java  | 138 +++++++++
 .../protocol/v0_10/MessageInteraction.java      | 147 ++++++++++
 .../protocol/v0_10/PerformativeResponse.java    |  48 ++++
 .../protocol/v0_10/ProtocolEventReceiver.java   |  67 +++++
 .../protocol/v0_10/SessionInteraction.java      |  89 ++++++
 .../tests/protocol/v0_10/TxInteraction.java     |  60 ++++
 .../resources/config-protocol-tests-0-10.json   |  78 +++++
 .../tests/protocol/v0_10/ConnectionTest.java    | 214 ++++++++++++++
 .../qpid/tests/protocol/v0_10/MessageTest.java  | 260 +++++++++++++++++
 .../qpid/tests/protocol/v0_10/SessionTest.java  | 123 ++++++++
 .../tests/protocol/v0_10/TransactionTest.java   |  92 ++++++
 .../protocol/v0_8/ExchangeInteraction.java      |  58 ++++
 .../tests/protocol/v0_8/FrameTransport.java     |  12 +-
 .../qpid/tests/protocol/v0_8/Interaction.java   |  23 +-
 .../qpid/tests/protocol/v0_8/TxInteraction.java |  44 +++
 .../qpid/tests/protocol/v0_8/ChannelTest.java   |   2 +-
 .../tests/protocol/v0_8/ConnectionTest.java     |  26 +-
 .../tests/protocol/v0_8/TransactionTest.java    |  79 ++++++
 .../tests/protocol/v1_0/FrameTransport.java     |   4 +-
 .../qpid/tests/protocol/v1_0/Interaction.java   |   9 +-
 .../tests/protocol/AbstractFrameTransport.java  | 175 ++++++++++++
 .../tests/protocol/AbstractInteraction.java     | 150 ++++++++++
 .../tests/protocol/ChannelClosedResponse.java   |  36 +++
 .../qpid/tests/protocol/FrameTransport.java     | 190 -------------
 .../apache/qpid/tests/protocol/Interaction.java | 147 ----------
 35 files changed, 3097 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e7d58bb..0c0c445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@
     <module>systests/qpid-systests-jms_2.0</module>
     <module>systests/protocol-tests-core</module>
     <module>systests/protocol-tests-amqp-0-8</module>
+    <module>systests/protocol-tests-amqp-0-10</module>
     <module>systests/protocol-tests-amqp-1-0</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
@@ -427,6 +428,12 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-amqp-0-10</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>
@@ -1332,26 +1339,26 @@
       </properties>
     </profile>
 
-      <profile>
-          <id>java-json.0-10</id>
-          <activation>
-              <property>
-                  <name>profile</name>
-                  <value>java-json.0-10</value>
-              </property>
-          </activation>
-          <properties>
-              <profile>java-json.0-10</profile>
-              <profile.specific.excludes>JavaPersistentExcludes 
JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
-              <profile.broker.version>v0_10</profile.broker.version>
-              
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
-              <profile.broker.persistent>true</profile.broker.persistent>
-              <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
-              
<profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
-          </properties>
-      </profile>
-
-      <profile>
+    <profile>
+      <id>java-json.0-10</id>
+      <activation>
+        <property>
+          <name>profile</name>
+          <value>java-json.0-10</value>
+        </property>
+      </activation>
+      <properties>
+        <profile>java-json.0-10</profile>
+        <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes 
XAExcludes Java010Excludes</profile.specific.excludes>
+        <profile.broker.version>v0_10</profile.broker.version>
+        
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
+        <profile.broker.persistent>true</profile.broker.persistent>
+        <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
+        
<profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+      </properties>
+    </profile>
+
+    <profile>
       <id>cpp</id>
       <activation>
         <property>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/pom.xml 
b/systests/protocol-tests-amqp-0-10/pom.xml
new file mode 100644
index 0000000..3acf129
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-parent</artifactId>
+        <version>7.1.0-SNAPSHOT</version>
+        <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>protocol-tests-amqp-0-10</artifactId>
+    <name>Apache Qpid Protocol Tests for AMQP 0-10</name>
+    <description>Tests for AMQP 0-10</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-test-utils</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>protocol-tests-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-systests-utils</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-logging-logback</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-memory-store</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-derby-store</artifactId>
+            <optional>true</optional>
+            <scope>test</scope>
+        </dependency>
+
+       <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-bdbstore</artifactId>
+            <scope>test</scope>
+       </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemPropertyVariables>
+                        
<qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-10.json</qpid.initialConfigurationLocation>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
new file mode 100644
index 0000000..aac39b6
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+
+public class Assembler implements NetworkDelegate
+{
+
+    private static final int ARRAY_SIZE = 0xFF;
+    private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+    private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
+
+    private final ProtocolEventReceiver receiver;
+    private final Map<Integer, List<Frame>> segments;
+    private static final ThreadLocal<BBDecoder> _decoder = 
ThreadLocal.withInitial(BBDecoder::new);
+
+    Assembler(ProtocolEventReceiver receiver)
+    {
+        this.receiver = receiver;
+        segments = new HashMap<>();
+    }
+
+    private int segmentKey(Frame frame)
+    {
+        return (frame.getTrack() + 1) * frame.getChannel();
+    }
+
+    private List<Frame> getSegment(Frame frame)
+    {
+        return segments.get(segmentKey(frame));
+    }
+
+    private void setSegment(Frame frame, List<Frame> segment)
+    {
+        int key = segmentKey(frame);
+        if (segments.containsKey(key))
+        {
+            error(new ProtocolError(Frame.L2, "segment in progress: %s",
+                                    frame));
+        }
+        segments.put(segmentKey(frame), segment);
+    }
+
+    private void clearSegment(Frame frame)
+    {
+        segments.remove(segmentKey(frame));
+    }
+
+    private void emit(int channel, ProtocolEvent event)
+    {
+        event.setChannel(channel);
+        receiver.received(event);
+    }
+
+    void received(NetworkEvent event)
+    {
+        event.delegate(this);
+    }
+
+    public void init(ProtocolHeader header)
+    {
+        emit(0, header);
+    }
+
+    public void error(ProtocolError error)
+    {
+        emit(0, error);
+    }
+
+    public void frame(Frame frame)
+    {
+        ByteBuffer segment;
+        if (frame.isFirstFrame() && frame.isLastFrame())
+        {
+            segment = frame.getBody();
+            assemble(frame, segment);
+        }
+        else
+        {
+            List<Frame> frames;
+            if (frame.isFirstFrame())
+            {
+                frames = new ArrayList<>();
+                setSegment(frame, frames);
+            }
+            else
+            {
+                frames = getSegment(frame);
+            }
+
+            frames.add(frame);
+
+            if (frame.isLastFrame())
+            {
+                clearSegment(frame);
+
+                int size = 0;
+                for (Frame f : frames)
+                {
+                    size += f.getSize();
+                }
+                segment = allocateByteBuffer(size);
+                for (Frame f : frames)
+                {
+                    segment.put(f.getBody());
+                }
+                segment.flip();
+                assemble(frame, segment);
+            }
+        }
+    }
+
+    private ByteBuffer allocateByteBuffer(final int size)
+    {
+        return ByteBuffer.allocate(size);
+    }
+
+    private void assemble(Frame frame, ByteBuffer segment)
+    {
+        BBDecoder dec = _decoder.get();
+        dec.init(segment);
+
+        int channel = frame.getChannel();
+        Method command;
+
+        switch (frame.getType())
+        {
+            case CONTROL:
+                int controlType = dec.readUint16();
+                Method control = Method.create(controlType);
+                control.read(dec);
+                emit(channel, control);
+                break;
+            case COMMAND:
+                int commandType = dec.readUint16();
+                // read in the session header, right now we don't use it
+                int hdr = dec.readUint16();
+                command = Method.create(commandType);
+                command.setSync((0x0001 & hdr) != 0);
+                command.read(dec);
+                if (command.hasPayload() && !frame.isLastSegment())
+                {
+                    setIncompleteCommand(channel, command);
+                }
+                else
+                {
+                    emit(channel, command);
+                }
+                break;
+            case HEADER:
+                command = getIncompleteCommand(channel);
+                List<Struct> structs = null;
+                DeliveryProperties deliveryProps = null;
+                MessageProperties messageProps = null;
+
+                while (dec.hasRemaining())
+                {
+                    Struct struct = dec.readStruct32();
+                    if (struct instanceof DeliveryProperties && deliveryProps 
== null)
+                    {
+                        deliveryProps = (DeliveryProperties) struct;
+                    }
+                    else if (struct instanceof MessageProperties && 
messageProps == null)
+                    {
+                        messageProps = (MessageProperties) struct;
+                    }
+                    else
+                    {
+                        if (structs == null)
+                        {
+                            structs = new ArrayList<>(2);
+                        }
+                        structs.add(struct);
+                    }
+                }
+                command.setHeader(new Header(deliveryProps, messageProps, 
structs));
+
+                if (frame.isLastSegment())
+                {
+                    setIncompleteCommand(channel, null);
+                    emit(channel, command);
+                }
+                break;
+            case BODY:
+                command = getIncompleteCommand(channel);
+                command.setBody(QpidByteBuffer.wrap(segment));
+                setIncompleteCommand(channel, null);
+                emit(channel, command);
+                break;
+            default:
+                throw new IllegalStateException("unknown frame type: " + 
frame.getType());
+        }
+
+        dec.releaseBuffer();
+    }
+
+    private void setIncompleteCommand(int channelId, Method incomplete)
+    {
+        if ((channelId & ARRAY_SIZE) == channelId)
+        {
+            _incompleteMethodArray[channelId] = incomplete;
+        }
+        else
+        {
+            if (incomplete != null)
+            {
+                _incompleteMethodMap.put(channelId, incomplete);
+            }
+            else
+            {
+                _incompleteMethodMap.remove(channelId);
+            }
+        }
+    }
+
+    private Method getIncompleteCommand(int channelId)
+    {
+        if ((channelId & ARRAY_SIZE) == channelId)
+        {
+            return _incompleteMethodArray[channelId];
+        }
+        else
+        {
+            return _incompleteMethodMap.get(channelId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
new file mode 100644
index 0000000..d7b54b0
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
+
+public class ConnectionInteraction
+{
+    public static final String SASL_MECHANISM_ANONYMOUS = "ANONYMOUS";
+    public static final String SASL_MECHANISM_PLAIN = "PLAIN";
+
+    private final Interaction _interaction;
+    private ConnectionStartOk _startOk;
+    private ConnectionTuneOk _tuneOk;
+    private ConnectionOpen _open;
+
+    public ConnectionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _startOk = new ConnectionStartOk();
+        _tuneOk = new ConnectionTuneOk();
+        _open = new ConnectionOpen();
+    }
+
+    public Interaction startOk() throws Exception
+    {
+        return _interaction.sendPerformative(_startOk);
+    }
+
+    public ConnectionInteraction startOkMechanism(final String mechanism)
+    {
+        _startOk.setMechanism(mechanism);
+        return this;
+    }
+
+    public Interaction tuneOk() throws Exception
+    {
+        return _interaction.sendPerformative(_tuneOk);
+    }
+
+    public Interaction open() throws Exception
+    {
+        return _interaction.sendPerformative(_open);
+    }
+
+    public ConnectionInteraction tuneOkChannelMax(final int channelMax)
+    {
+        _tuneOk.setChannelMax(channelMax);
+        return this;
+    }
+
+    public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize)
+    {
+        _tuneOk.setMaxFrameSize(maxFrameSize);
+        return this;
+    }
+
+    public ConnectionInteraction startOkResponse(final byte[] response)
+    {
+        _startOk.setResponse(response);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
new file mode 100644
index 0000000..e60049e
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.lang.Math.min;
+import static 
org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_SEG;
+import static 
org.apache.qpid.server.protocol.v0_10.transport.Frame.HEADER_SIZE;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.FrameSizeObserver;
+import org.apache.qpid.server.protocol.v0_10.ProtocolEventSender;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.transport.ByteBufferSender;
+
+/**
+ * Disassembler
+ */
+public final class Disassembler implements ProtocolEventSender, 
ProtocolDelegate<Void>, FrameSizeObserver
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Disassembler.class);
+    private final ByteBufferSender _sender;
+    private final Object _sendlock = new Object();
+    private volatile int _maxPayload;
+    private final static ThreadLocal<BBEncoder> _encoder = new 
ThreadLocal<BBEncoder>()
+    {
+        public BBEncoder initialValue()
+        {
+            return new BBEncoder(4 * 1024);
+        }
+    };
+
+    public Disassembler(ByteBufferSender sender, int maxFrame)
+    {
+        _sender = sender;
+        if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+        {
+            throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE 
and < 64K: " + maxFrame);
+        }
+        _maxPayload = maxFrame - HEADER_SIZE;
+    }
+
+    public void send(ProtocolEvent event)
+    {
+        event.delegate(null, this);
+    }
+
+    public void flush()
+    {
+        synchronized (_sendlock)
+        {
+            _sender.flush();
+        }
+    }
+
+    public void close()
+    {
+        synchronized (_sendlock)
+        {
+            _sender.close();
+        }
+    }
+
+    public void init(Void v, ProtocolHeader header)
+    {
+        synchronized (_sendlock)
+        {
+            _sender.send(header.toByteBuffer());
+            _sender.flush();
+        }
+    }
+
+    public void control(Void v, Method method)
+    {
+        method(method, SegmentType.CONTROL);
+    }
+
+    public void command(Void v, Method method)
+    {
+        method(method, SegmentType.COMMAND);
+    }
+
+    private void method(Method method, SegmentType type)
+    {
+        BBEncoder enc = _encoder.get();
+        enc.init();
+        enc.writeUint16(method.getEncodedType());
+        if (type == SegmentType.COMMAND)
+        {
+            if (method.isSync())
+            {
+                enc.writeUint16(0x0101);
+            }
+            else
+            {
+                enc.writeUint16(0x0100);
+            }
+        }
+        method.write(enc);
+        int methodLimit = enc.position();
+
+        byte flags = FIRST_SEG;
+
+        boolean payload = method.hasPayload();
+        if (!payload)
+        {
+            flags |= LAST_SEG;
+        }
+
+        int headerLimit = -1;
+        if (payload)
+        {
+            final Header hdr = method.getHeader();
+            if (hdr != null)
+            {
+                if(hdr.getDeliveryProperties() != null)
+                {
+                    enc.writeStruct32(hdr.getDeliveryProperties());
+                }
+                if(hdr.getMessageProperties() != null)
+                {
+                    enc.writeStruct32(hdr.getMessageProperties());
+                }
+                if(hdr.getNonStandardProperties() != null)
+                {
+                    for (Struct st : hdr.getNonStandardProperties())
+                    {
+                        enc.writeStruct32(st);
+                    }
+                }
+            }
+            headerLimit = enc.position();
+        }
+
+        synchronized (_sendlock)
+        {
+            ByteBuffer buf = enc.underlyingBuffer();
+            buf.flip();
+            ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+            copy.put(buf.duplicate());
+            copy.flip();
+
+            final ByteBuffer methodBuf = view(copy,0, methodLimit);
+            fragment(flags, type, method, methodBuf);
+            if (payload)
+            {
+                QpidByteBuffer qpidByteBuffer = method.getBody();
+                ByteBuffer body = null;
+                if (qpidByteBuffer != null)
+                {
+                    body = ByteBuffer.allocate(qpidByteBuffer.remaining());
+                    qpidByteBuffer.copyTo(body);
+                }
+                ByteBuffer headerBuf = view(copy, methodLimit, headerLimit);
+                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, 
method, headerBuf);
+                if (body != null)
+                {
+                    fragment(LAST_SEG, SegmentType.BODY, method, 
body.duplicate());
+                }
+            }
+        }
+    }
+
+    private void fragment(byte flags, SegmentType type, ProtocolEvent event, 
ByteBuffer buffer)
+    {
+        byte typeb = (byte) type.getValue();
+        byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+        int remaining = buffer.remaining();
+        boolean first = true;
+        while (true)
+        {
+            int size = min(_maxPayload, remaining);
+            remaining -= size;
+
+            byte newflags = flags;
+            if (first)
+            {
+                newflags |= FIRST_FRAME;
+                first = false;
+            }
+            if (remaining == 0)
+            {
+                newflags |= LAST_FRAME;
+            }
+
+            frame(newflags, typeb, track, event.getChannel(), size, buffer);
+
+            if (remaining == 0)
+            {
+                break;
+            }
+        }
+    }
+
+    private void frame(byte flags, byte type, byte track, int channel, int 
size, ByteBuffer buffer)
+    {
+        ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+
+        data.put(0, flags);
+        data.put(1, type);
+        data.putShort(2, (short) (size + HEADER_SIZE));
+        data.put(4, (byte) 0);
+        data.put(5, track);
+        data.putShort(6, (short) channel);
+
+        try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(data))
+        {
+            _sender.send(qpidByteBuffer);
+        }
+
+        if(size > 0)
+        {
+            final ByteBuffer view = view(buffer, 0, size);
+            try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(view))
+            {
+                _sender.send(qpidByteBuffer);
+            }
+            buffer.position(buffer.position() + size);
+        }
+    }
+
+    public void error(Void v, ProtocolError error)
+    {
+        throw new IllegalArgumentException(String.valueOf(error));
+    }
+
+    @Override
+    public void setMaxFrameSize(final int maxFrame)
+    {
+        if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+        {
+            throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE 
and < 64K: " + maxFrame);
+        }
+        _maxPayload = maxFrame - HEADER_SIZE;
+
+    }
+
+    private static ByteBuffer view(ByteBuffer buffer, int offset, int length)
+    {
+        ByteBuffer view = buffer.slice();
+        view.position(offset);
+        int newLimit = Math.min(view.position() + length, view.capacity());
+        view.limit(newLimit);
+        return view.slice();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
new file mode 100644
index 0000000..fa79489
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ErrorResponse implements Response<ProtocolError>
+{
+    private final ProtocolError _error;
+
+    public ErrorResponse(final ProtocolError protocolError)
+    {
+        _error = protocolError;
+    }
+
+    @Override
+    public ProtocolError getBody()
+    {
+        return _error;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
new file mode 100644
index 0000000..2e6817b
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionSync;
+
+public class ExecutionInteraction
+{
+    private final Interaction _interaction;
+    private final ExecutionSync _sync;
+
+    public ExecutionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _sync = new ExecutionSync();
+    }
+
+    public ExecutionInteraction syncId(final int id)
+    {
+        _sync.setId(id);
+        return this;
+    }
+
+    public Interaction sync() throws Exception
+    {
+        _interaction.sendPerformative(_sync);
+        return _interaction;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
new file mode 100644
index 0000000..fff894b
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.apache.qpid.server.transport.util.Functions.str;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+
+    private final ProtocolEventReceiver _receiver;
+
+    public enum State
+    {
+        PROTO_HDR,
+        FRAME_HDR,
+        FRAME_BODY,
+        ERROR
+    }
+
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+    private final Assembler _assembler;
+
+    private int _maxFrameSize = 4096;
+    private State _state;
+    private ByteBuffer input = null;
+    private int _needed;
+
+    private byte _flags;
+    private SegmentType _type;
+    private byte _track;
+    private int _channel;
+
+    FrameDecoder(final byte[] headerBytes)
+    {
+        _receiver = new ProtocolEventReceiver(headerBytes);
+        this._assembler = new Assembler(_receiver);
+        this._state = State.PROTO_HDR;
+        _needed = 8;
+
+    }
+
+    @Override
+    public Collection<Response<?>> decode(final ByteBuffer buf) throws 
Exception
+    {
+        int limit = buf.limit();
+        int remaining = buf.remaining();
+        while (remaining > 0)
+        {
+            if (remaining >= _needed)
+            {
+                int consumed = _needed;
+                int pos = buf.position();
+                if (input == null)
+                {
+                    buf.limit(pos + _needed);
+                    input = buf;
+                    _state = next(pos);
+                    buf.limit(limit);
+                    buf.position(pos + consumed);
+                }
+                else
+                {
+                    buf.limit(pos + _needed);
+                    input.put(buf);
+                    buf.limit(limit);
+                    input.flip();
+                    _state = next(0);
+                }
+
+                remaining -= consumed;
+                input = null;
+            }
+            else
+            {
+                if (input == null)
+                {
+                    input = ByteBuffer.allocate(_needed);
+                }
+                input.put(buf);
+                _needed -= remaining;
+                remaining = 0;
+            }
+        }
+        return _receiver.getReceivedEvents();
+    }
+
+    private State next(int pos)
+    {
+        input.order(ByteOrder.BIG_ENDIAN);
+
+        switch (_state) {
+            case PROTO_HDR:
+                if (input.get(pos) != 'A' &&
+                    input.get(pos + 1) != 'M' &&
+                    input.get(pos + 2) != 'Q' &&
+                    input.get(pos + 3) != 'P')
+                {
+                    error("bad protocol header: %s", str(input));
+                    return State.ERROR;
+                }
+
+                byte protoClass = input.get(pos + 4);
+                byte instance = input.get(pos + 5);
+                byte major = input.get(pos + 6);
+                byte minor = input.get(pos + 7);
+                _assembler.received(new ProtocolHeader(protoClass, instance, 
major, minor));
+                _needed = Frame.HEADER_SIZE;
+                return State.FRAME_HDR;
+            case FRAME_HDR:
+                _flags = input.get(pos);
+                _type = SegmentType.get(input.get(pos + 1));
+                int size = (0xFFFF & input.getShort(pos + 2));
+                size -= Frame.HEADER_SIZE;
+                _maxFrameSize = 64 * 1024;
+                if (size < 0 || size > (_maxFrameSize - 12))
+                {
+                    error("bad frame size: %d", size);
+                    return State.ERROR;
+                }
+                byte b = input.get(pos + 5);
+                if ((b & 0xF0) != 0) {
+                    error("non-zero reserved bits in upper nibble of " +
+                          "frame header byte 5: '%x'", b);
+                    return State.ERROR;
+                } else {
+                    _track = (byte) (b & 0xF);
+                }
+                _channel = (0xFFFF & input.getShort(pos + 6));
+                if (size == 0)
+                {
+                    Frame frame = new Frame(_flags, _type, _track, _channel, 
EMPTY_BYTE_BUFFER);
+                    _assembler.received(frame);
+                    _needed = Frame.HEADER_SIZE;
+                    return State.FRAME_HDR;
+                }
+                else
+                {
+                    _needed = size;
+                    return State.FRAME_BODY;
+                }
+            case FRAME_BODY:
+                Frame frame = new Frame(_flags, _type, _track, _channel, 
input.slice());
+                _assembler.received(frame);
+                _needed = Frame.HEADER_SIZE;
+                return State.FRAME_HDR;
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private void error(String fmt, Object ... args)
+    {
+        _assembler.received(new ProtocolError(Frame.L1, fmt, args));
+    }
+
+    public void setMaxFrameSize(final int maxFrameSize)
+    {
+        _maxFrameSize = maxFrameSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
new file mode 100644
index 0000000..dfec4f4
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+    @Override
+    public ByteBuffer encode(final Object msg)
+    {
+        if (msg instanceof ProtocolEvent)
+        {
+            final List<ByteBuffer> buffers = new ArrayList<>();
+            final AtomicInteger totalSize = new AtomicInteger();
+            Disassembler disassembler = new Disassembler(new ByteBufferSender()
+            {
+                @Override
+                public boolean isDirectBufferPreferred()
+                {
+                    return false;
+                }
+
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+                    int remaining = msg.remaining();
+                    byte[] data = new byte[remaining];
+                    ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+                    msg.get(data);
+                    buffers.add(byteBuffer);
+                    totalSize.addAndGet(remaining);
+                }
+
+                @Override
+                public void flush()
+                {
+
+                }
+
+                @Override
+                public void close()
+                {
+
+                }
+            }, 512);
+
+            disassembler.send((ProtocolEvent) msg);
+            ByteBuffer data = ByteBuffer.allocate(totalSize.get());
+            for (ByteBuffer buffer : buffers)
+            {
+                data.put(buffer);
+            }
+            data.flip();
+            return data;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
new file mode 100644
index 0000000..3b7849c
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+
+import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+
+
+public class FrameTransport extends AbstractFrameTransport<Interaction>
+{
+    private final byte[] _protocolHeader;
+
+    public FrameTransport(final InetSocketAddress brokerAddress)
+    {
+        super(brokerAddress, new FrameDecoder(new 
ProtocolEngineCreator_0_10().getHeaderIdentifier()), new FrameEncoder());
+        _protocolHeader = new 
ProtocolEngineCreator_0_10().getHeaderIdentifier();
+    }
+
+    @Override
+    public byte[] getProtocolHeader()
+    {
+        return _protocolHeader;
+    }
+
+    @Override
+    public Interaction newInteraction()
+    {
+        return new Interaction(this);
+    }
+
+    @Override
+    public FrameTransport connect()
+    {
+        super.connect();
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
new file mode 100644
index 0000000..5d53a89
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
+
+public class Interaction extends AbstractInteraction<Interaction>
+{
+    private ConnectionInteraction _connectionInteraction;
+    private SessionInteraction _sessionInteraction;
+    private MessageInteraction _messageInteraction;
+    private ExecutionInteraction _executionInteraction;
+    private int _channelId;
+    private TxInteraction _txInteraction;
+
+    public Interaction(final AbstractFrameTransport frameTransport)
+    {
+        super(frameTransport);
+        _connectionInteraction = new ConnectionInteraction(this);
+        _sessionInteraction = new SessionInteraction(this);
+        _messageInteraction = new MessageInteraction(this);
+        _executionInteraction = new ExecutionInteraction(this);
+        _txInteraction = new TxInteraction(this);
+    }
+
+    @Override
+    protected byte[] getProtocolHeader()
+    {
+        return getTransport().getProtocolHeader();
+    }
+
+    public <T extends Method> Interaction sendPerformative(final T 
performative) throws Exception
+    {
+        performative.setChannel(_channelId);
+        sendPerformativeAndChainFuture(copyPerformative(performative));
+        return this;
+    }
+
+    public ConnectionInteraction connection()
+    {
+        return _connectionInteraction;
+    }
+
+    private <T extends Method> T copyPerformative(final T src)
+    {
+        T dst = (T) Method.create(src.getStructType());
+        final BBEncoder encoder = new BBEncoder(4096);
+        encoder.init();
+        src.write(encoder);
+        ByteBuffer buffer = encoder.buffer();
+
+        final BBDecoder decoder = new BBDecoder();
+        decoder.init(buffer);
+        dst.read(decoder);
+        return dst;
+    }
+
+    public Interaction openAnonymousConnection() throws Exception
+    {
+        this.negotiateProtocol().consumeResponse()
+            .consumeResponse(ConnectionStart.class)
+            
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+            .consumeResponse(ConnectionTune.class)
+            .connection().tuneOk()
+            .connection().open()
+            .consumeResponse(ConnectionOpenOk.class);
+        return this;
+    }
+
+    public SessionInteraction session()
+    {
+        return _sessionInteraction;
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+    public Interaction channelId(final int channelId)
+    {
+        _channelId = channelId;
+        return this;
+    }
+
+    public Interaction attachSession(final byte[] sessionName) throws Exception
+    {
+        this.session()
+            .attachName(sessionName)
+            .attach()
+            .consumeResponse(SessionAttached.class)
+            .session().commandPointCommandId(0).commandPoint();
+        return this;
+    }
+
+    public MessageInteraction message()
+    {
+        return _messageInteraction;
+    }
+
+    public ExecutionInteraction execution()
+    {
+        return _executionInteraction;
+    }
+
+    public TxInteraction tx()
+    {
+        return _txInteraction;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
new file mode 100644
index 0000000..4660c86
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageFlow;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageSubscribe;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+
+public class MessageInteraction
+{
+    private final Interaction _interaction;
+    private MessageTransfer _transfer;
+    private MessageSubscribe _subscribe;
+    private MessageFlow _flow;
+    private MessageAccept _accept;
+
+    public MessageInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _transfer = new MessageTransfer();
+        _subscribe = new MessageSubscribe();
+        _flow = new MessageFlow();
+        _accept = new MessageAccept();
+    }
+
+    public MessageInteraction transferId(final int id)
+    {
+        _transfer.setId(id);
+        return this;
+    }
+
+    public MessageInteraction transferDesitnation(final String destination)
+    {
+        _transfer.setDestination(destination);
+        return this;
+    }
+
+    public Interaction transfer() throws Exception
+    {
+        _interaction.sendPerformative(_transfer);
+        return _interaction;
+    }
+
+    public MessageInteraction subscribeQueue(final String queueName)
+    {
+        _subscribe.setQueue(queueName);
+        return this;
+    }
+
+    public MessageInteraction subscribeId(final int id)
+    {
+        _subscribe.setId(id);
+        return this;
+    }
+
+    public Interaction subscribe() throws Exception
+    {
+        return _interaction.sendPerformative(_subscribe);
+    }
+
+    public MessageInteraction subscribeDestination(final String destination)
+    {
+        _subscribe.setDestination(destination);
+        return this;
+    }
+
+    public Interaction flow() throws Exception
+    {
+        return _interaction.sendPerformative(_flow);
+    }
+
+    public MessageInteraction flowId(final int id)
+    {
+        _flow.setId(id);
+        return this;
+    }
+
+    public MessageInteraction flowDestination(final String destination)
+    {
+        _flow.setDestination(destination);
+        return this;
+    }
+
+    public MessageInteraction flowUnit(final MessageCreditUnit unit)
+    {
+        _flow.setUnit(unit);
+        return this;
+    }
+
+    public MessageInteraction flowValue(final long value)
+    {
+        _flow.setValue(value);
+        return this;
+    }
+
+    public MessageInteraction subscribeAcceptMode(final MessageAcceptMode 
acceptMode)
+    {
+        _subscribe.setAcceptMode(acceptMode);
+        return this;
+    }
+
+    public MessageInteraction subscribeAcquireMode(final MessageAcquireMode 
acquireMode)
+    {
+        _subscribe.setAcquireMode(acquireMode);
+        return this;
+    }
+
+    public Interaction accept() throws Exception
+    {
+        return _interaction.sendPerformative(_accept);
+    }
+
+    public MessageInteraction acceptId(final int id)
+    {
+        _accept.setId(id);
+        return this;
+    }
+
+    public MessageInteraction acceptTransfers(final RangeSet transfers)
+    {
+        _accept.setTransfers(transfers);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
new file mode 100644
index 0000000..701e92c
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.tests.protocol.Response;
+
+public class PerformativeResponse implements Response<Method>
+{
+    private Method _method;
+
+    public PerformativeResponse(final Method method)
+    {
+        _method = method;
+    }
+
+    @Override
+    public Method getBody()
+    {
+        return _method;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "PerformativeResponse{" +
+               "_method=" + _method +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
new file mode 100644
index 0000000..37eb66e
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ProtocolEventReceiver
+{
+    private Queue<Response<?>> _events = new ConcurrentLinkedQueue<>();
+    private final byte[] _headerBytes;
+
+    public ProtocolEventReceiver(final byte[] headerBytes)
+    {
+        _headerBytes = headerBytes;
+    }
+
+    void received(ProtocolEvent msg)
+    {
+        if (msg instanceof ProtocolHeader)
+        {
+            _events.add(new HeaderResponse(_headerBytes));
+        }
+        else if (msg instanceof Method)
+        {
+            _events.add(new PerformativeResponse((Method) msg));
+        }
+        else if (msg instanceof ProtocolError)
+        {
+            _events.add(new ErrorResponse((ProtocolError) msg));
+        }
+    }
+
+    public Collection<Response<?>> getReceivedEvents()
+    {
+        Collection<Response<?>> results = new ArrayList<>(_events);
+        _events.removeAll(results);
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
new file mode 100644
index 0000000..fce711d
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+
+public class SessionInteraction
+{
+    private final Interaction _interaction;
+    private SessionAttach _attach;
+    private SessionDetach _detach;
+    private SessionCommandPoint _commandPoint;
+    private SessionFlush _flush;
+
+    public SessionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _attach = new SessionAttach();
+        _detach = new SessionDetach();
+        _commandPoint = new SessionCommandPoint();
+        _flush = new SessionFlush();
+    }
+
+    public Interaction attach() throws Exception
+    {
+        return _interaction.sendPerformative(_attach);
+    }
+
+    public SessionInteraction attachName(final byte[] name)
+    {
+        _attach.setName(name);
+        return this;
+    }
+
+    public Interaction detach() throws Exception
+    {
+        return _interaction.sendPerformative(_detach);
+    }
+
+    public SessionInteraction detachName(final byte[] sessionName)
+    {
+        _detach.setName(sessionName);
+        return this;
+    }
+
+    public Interaction commandPoint() throws Exception
+    {
+        return _interaction.sendPerformative(_commandPoint);
+    }
+
+    public SessionInteraction commandPointCommandId(final int commandId)
+    {
+        _commandPoint.setCommandId(commandId);
+        return this;
+    }
+
+    public Interaction flush() throws Exception
+    {
+        return _interaction.sendPerformative(_flush);
+    }
+
+    public SessionInteraction flushCompleted()
+    {
+        _flush.setCompleted(true);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
new file mode 100644
index 0000000..14c9912
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.TxCommit;
+import org.apache.qpid.server.protocol.v0_10.transport.TxSelect;
+
+public class TxInteraction
+{
+    private final Interaction _interaction;
+    private final TxSelect _select;
+    private final TxCommit _commit;
+
+    public TxInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _select = new TxSelect();
+        _commit = new TxCommit();
+    }
+
+    public Interaction select() throws Exception
+    {
+        return _interaction.sendPerformative(_select);
+    }
+
+    public TxInteraction selectId(final int id)
+    {
+        _select.setId(id);
+        return this;
+    }
+
+    public TxInteraction commitId(final int id)
+    {
+        _commit.setId(id);
+        return this;
+    }
+
+    public Interaction commit() throws Exception
+    {
+        return _interaction.sendPerformative(_commit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
 
b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
new file mode 100644
index 0000000..69387fb
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+  "name" : "${broker.name}",
+  "modelVersion" : "7.0",
+  "authenticationproviders" : [ {
+    "name" : "anon",
+    "type" : "Anonymous"
+  }, {
+    "name" : "plain",
+    "type" : "Plain",
+    "secureOnlyMechanisms" : [],
+    "users" : [ {
+      "name" : "admin",
+      "type" : "managed",
+      "password" : "admin"
+    }, {
+      "name" : "guest",
+      "type" : "managed",
+      "password" : "guest"
+    } ]
+  } ],
+  "ports" : [ {
+    "name" : "AMQP",
+    "type" : "AMQP",
+    "authenticationProvider" : "plain",
+    "port" : "0",
+    "protocols" : [ "AMQP_0_10" ],
+    "virtualhostaliases" : [ {
+      "name" : "defaultAlias",
+      "type" : "defaultAlias"
+    }, {
+      "name" : "hostnameAlias",
+      "type" : "hostnameAlias"
+    }, {
+      "name" : "nameAlias",
+      "type" : "nameAlias"
+    } ]
+  }, {
+    "name" : "ANONYMOUS_AMQP",
+    "type" : "AMQP",
+    "authenticationProvider" : "anon",
+    "port" : "0",
+    "protocols" : [ "AMQP_0_10" ],
+    "virtualhostaliases" : [ {
+      "name" : "defaultAlias",
+      "type" : "defaultAlias",
+      "durable" : true
+    }, {
+      "name" : "hostnameAlias",
+      "type" : "hostnameAlias",
+      "durable" : true
+    }, {
+      "name" : "nameAlias",
+      "type" : "nameAlias",
+      "durable" : true
+    } ]
+  } ],
+  "virtualhostnodes" : []
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
new file mode 100644
index 0000000..1072f7c
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ConnectionTest extends BrokerAdminUsingTestBase
+{
+    private static final String DEFAULT_LOCALE = "en_US";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3. Version Negotiation",
+            description = "When the client opens a new socket connection to an 
AMQP server,"
+                          + " it MUST send a protocol header with the client's 
preferred protocol version."
+                          + "If the requested protocol version is supported, 
the server MUST send its own protocol"
+                          + " header with the requested version to the socket, 
and then implement the protocol accordingly")
+    public void versionNegotiation() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            Response<?> response = 
interaction.negotiateProtocol().consumeResponse().getLatestResponse();
+            assertThat(response, is(instanceOf(HeaderResponse.class)));
+            assertThat(response.getBody(), 
is(IsEqual.equalTo(transport.getProtocolHeader())));
+
+            ConnectionStart connectionStart = 
interaction.consumeResponse().getLatestResponse(ConnectionStart.class);
+            assertThat(connectionStart.getMechanisms(), is(notNullValue()));
+            assertThat(connectionStart.getMechanisms(), 
contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS));
+            assertThat(connectionStart.getLocales(), is(notNullValue()));
+            assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9.connection.start-ok",
+            description = "An AMQP client MUST handle incoming 
connection.start controls.")
+    public void startOk() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                       
.consumeResponse().getLatestResponse(ConnectionTune.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9.connection.tune-ok",
+            description = "This control sends the client's connection tuning 
parameters to the server."
+                          + " Certain fields are negotiated, others provide 
capability information.")
+    public void tuneOkAndOpen() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                       .consumeResponse(ConnectionTune.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       
.consumeResponse().getLatestResponse(ConnectionOpenOk.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START 
C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassBySendingTuneOk() throws Exception
+    {
+        InetSocketAddress brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new 
FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       
.consumeResponse().getLatestResponse(ConnectionClose.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START 
C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassBySendingOpen() throws Exception
+    {
+        InetSocketAddress brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new 
FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            
interaction.negotiateProtocol().consumeResponse().consumeResponse(ConnectionStart.class)
+                       .connection().open()
+                       
.consumeResponse().getLatestResponse(ConnectionClose.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START 
C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassAfterSendingStartOk() throws Exception
+    {
+        InetSocketAddress brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new 
FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk().consumeResponse(ConnectionSecure.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionClose.class, 
ChannelClosedResponse.class);
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "9.connection.tune-ok.minimum",
+            description = "[...] the minimum negotiated value for 
max-frame-size is also MIN-MAX-FRAME-SIZE [4096]")
+    public void tooSmallFrameSize() throws Exception
+    {
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTune response = 
interaction.negotiateProtocol().consumeResponse()
+                                                 
.consumeResponse(ConnectionStart.class)
+                                                 
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                                                 
.consumeResponse().getLatestResponse(ConnectionTune.class);
+
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                                    .tuneOkMaxFrameSize(1024)
+                                    .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionClose.class, 
ChannelClosedResponse.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9.connection.tune-ok.max-frame-size",
+            description = "If the client specifies a channel max that is 
higher than the value provided by the server,"
+                          + " the server MUST close the connection without 
attempting a negotiated close."
+                          + " The server may report the error in some fashion 
to assist implementers.")
+    public void tooLargeFrameSize() throws Exception
+    {
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTune response = 
interaction.negotiateProtocol().consumeResponse()
+                                                 
.consumeResponse(ConnectionStart.class)
+                                                 
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                                                 
.consumeResponse().getLatestResponse(ConnectionTune.class);
+
+            assumeThat(response.hasMaxFrameSize(), is(true));
+            assumeThat(response.getMaxFrameSize(), is(lessThan(0xFFFF)));
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                                    
.tuneOkMaxFrameSize(response.getMaxFrameSize() + 1)
+                                    .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionClose.class, 
ChannelClosedResponse.class);
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to