Repository: qpid-broker-j Updated Branches: refs/heads/master a3c00bbfc -> c6d80d80e
[Broker-J][System Tests] Add protocol system test suites for AMQP 0-8,0-9 and 0-9-1 Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c6d80d80 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c6d80d80 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c6d80d80 Branch: refs/heads/master Commit: c6d80d80e685f70653f609951868a9e8f0ffb5a6 Parents: 06e53d7 Author: Alex Rudyy <oru...@apache.org> Authored: Sat Nov 18 01:32:22 2017 +0000 Committer: Alex Rudyy <oru...@apache.org> Committed: Sat Nov 18 01:38:53 2017 +0000 ---------------------------------------------------------------------- .../qpid/server/protocol/v0_8/AMQDecoder.java | 4 +- .../server/protocol/v0_8/ServerDecoder.java | 2 +- .../server/protocol/v0_8/ClientDecoder.java | 2 +- pom.xml | 7 + systests/protocol-tests-amqp-0-8/pom.xml | 109 +++++++ .../tests/protocol/v0_8/BasicInteraction.java | 214 ++++++++++++ .../tests/protocol/v0_8/ChannelInteraction.java | 51 +++ .../qpid/tests/protocol/v0_8/ClientDecoder.java | 323 +++++++++++++++++++ .../protocol/v0_8/ConnectionInteraction.java | 103 ++++++ .../qpid/tests/protocol/v0_8/FrameDecoder.java | 111 +++++++ .../qpid/tests/protocol/v0_8/FrameEncoder.java | 83 +++++ .../tests/protocol/v0_8/FrameTransport.java | 101 ++++++ .../qpid/tests/protocol/v0_8/Interaction.java | 109 +++++++ .../protocol/v0_8/PerformativeResponse.java | 54 ++++ .../tests/protocol/v0_8/QueueInteraction.java | 63 ++++ .../resources/config-protocol-tests-0-8.json | 78 +++++ .../qpid/tests/protocol/v0_8/BasicTest.java | 156 +++++++++ .../qpid/tests/protocol/v0_8/ChannelTest.java | 54 ++++ .../tests/protocol/v0_8/ConnectionTest.java | 183 +++++++++++ .../qpid/tests/protocol/v0_8/QueueTest.java | 67 ++++ 20 files changed, 1870 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java index 706f1ae..add9fb1 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java @@ -217,11 +217,11 @@ public abstract class AMQDecoder<T extends MethodProcessor> } - abstract void processMethod(int channelId, + protected abstract void processMethod(int channelId, QpidByteBuffer in) throws AMQFrameDecodingException; - AMQFrameDecodingException newUnknownMethodException(final int classId, + protected AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java index 08ccda9..59d4985 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java @@ -46,7 +46,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se @Override - void processMethod(int channelId, + protected void processMethod(int channelId, QpidByteBuffer in) throws AMQFrameDecodingException { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java index de8afb9..8e02bbb 100644 --- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java @@ -86,7 +86,7 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl } @Override - void processMethod(int channelId, + protected void processMethod(int channelId, QpidByteBuffer in) throws AMQFrameDecodingException { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 62e7033..e7d58bb 100644 --- a/pom.xml +++ b/pom.xml @@ -197,6 +197,7 @@ <module>systests/systests-utils</module> <module>systests/qpid-systests-jms_2.0</module> <module>systests/protocol-tests-core</module> + <module>systests/protocol-tests-amqp-0-8</module> <module>systests/protocol-tests-amqp-1-0</module> <module>systests/end-to-end-conversion-tests</module> <module>perftests</module> @@ -420,6 +421,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>protocol-tests-amqp-0-8</artifactId> + <version>${project.version}</version> + </dependency> + <!-- External dependencies --> <dependency> <groupId>org.apache.qpid</groupId> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/pom.xml ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/pom.xml b/systests/protocol-tests-amqp-0-8/pom.xml new file mode 100644 index 0000000..3f788db --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/pom.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-parent</artifactId> + <version>7.1.0-SNAPSHOT</version> + <relativePath>../../qpid-systests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>protocol-tests-amqp-0-8</artifactId> + <name>Apache Qpid Protocol Tests for AMQP 0-8,0-9,0-9-1</name> + <description>Tests for AMQP 0-8,0-9,0-9-1</description> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-test-utils</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>protocol-tests-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-utils</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-logging-logback</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-memory-store</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-derby-store</artifactId> + <optional>true</optional> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-bdbstore</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-integration</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-8.json</qpid.initialConfigurationLocation> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java new file mode 100644 index 0000000..9ef66ec --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; +import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicQosBody; +import org.apache.qpid.server.protocol.v0_8.transport.CompositeAMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; + +public class BasicInteraction +{ + private final Interaction _interaction; + private String _publishExchange; + private String _publishRoutingKey; + private boolean _publishMandatory; + private boolean _publishImmediate; + private byte[] _content; + private Map<String, Object> _contentHeaderPropertiesHeaders = new HashMap<>(); + private String _contentHeaderPropertiesContentType; + private byte _contentHeaderPropertiesDeliveryMode; + private byte _contentHeaderPropertiesPriority; + private int _qosPrefetchCount; + private long _qosPrefetchSize; + private boolean _qosGlobal; + private String _consumeQueueName; + private String _consumeConsumerTag; + private boolean _consumeNoLocal; + private boolean _consumeNoAck; + private boolean _consumeExclusive; + private boolean _consumeNoWait; + private Map<String, Object> _consumeArguments = new HashMap<>(); + private long _ackDeliveryTag; + private boolean _ackMultiple; + + public BasicInteraction(final Interaction interaction) + { + _interaction = interaction; + } + + public Interaction publish() throws Exception + { + return _interaction.sendPerformative(new BasicPublishBody(0, + AMQShortString.valueOf(_publishExchange), + AMQShortString.valueOf(_publishRoutingKey), + _publishMandatory, + _publishImmediate)); + } + + public BasicInteraction content(final String content) + { + _content = content.getBytes(StandardCharsets.UTF_8); + return this; + } + + public BasicInteraction content(final byte[] content) + { + _content = content; + return this; + } + + public BasicInteraction contentHeaderPropertiesHeaders(final Map<String, Object> messageHeaders) + { + _contentHeaderPropertiesHeaders = messageHeaders; + return this; + } + + public BasicInteraction contentHeaderPropertiesContentType(final String messageContentType) + { + _contentHeaderPropertiesContentType = messageContentType; + return this; + } + + public BasicInteraction contentHeaderPropertiesPriority(final byte priority) + { + _contentHeaderPropertiesPriority = priority; + return this; + } + + public BasicInteraction contentHeaderPropertiesDeliveryMode(final byte deliveryMode) + { + _contentHeaderPropertiesDeliveryMode = deliveryMode; + return this; + } + + + public Interaction publishMessage() throws Exception + { + List<AMQFrame> frames = new ArrayList<>(); + BasicPublishBody publishFrame = new BasicPublishBody(0, + AMQShortString.valueOf(_publishExchange), + AMQShortString.valueOf(_publishRoutingKey), + _publishMandatory, + _publishImmediate); + frames.add(new AMQFrame(_interaction.getChannelId(), publishFrame)); + final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties(); + basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders)); + basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType); + basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode); + basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority); + final int contentSize = _content == null ? 0 : _content.length; + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(basicContentHeaderProperties, contentSize); + frames.add(new AMQFrame(_interaction.getChannelId(), contentHeaderBody)); + if (contentSize > 0) + { + final int framePayloadMax = _interaction.getMaximumFrameSize() - 8; + int offset = 0; + do + { + int contentToCopyLength = Math.min(framePayloadMax, contentSize - offset); + ContentBody contentBody = new ContentBody(ByteBuffer.wrap(_content, offset, + contentToCopyLength)); + frames.add(new AMQFrame(_interaction.getChannelId(), contentBody)); + offset += contentToCopyLength; + } + while (offset < contentSize); + } + + CompositeAMQDataBlock frame = new CompositeAMQDataBlock(frames.toArray(new AMQFrame[frames.size()])); + + return _interaction.sendPerformative(frame); + } + + public BasicInteraction publishExchange(final String exchangeName) + { + _publishExchange = exchangeName; + return this; + } + + public BasicInteraction publishRoutingKey(final String queueName) + { + _publishRoutingKey = queueName; + return this; + } + + public BasicInteraction qosPrefetchCount(final int prefetchCount) + { + _qosPrefetchCount = prefetchCount; + return this; + } + + public Interaction qos() throws Exception + { + return _interaction.sendPerformative(new BasicQosBody(_qosPrefetchSize, + _qosPrefetchCount, + _qosGlobal)); + } + + public Interaction consume() throws Exception + { + return _interaction.sendPerformative(new BasicConsumeBody(0, + AMQShortString.valueOf(_consumeQueueName), + AMQShortString.valueOf(_consumeConsumerTag), + _consumeNoLocal, + _consumeNoAck, + _consumeExclusive, + _consumeNoWait, + FieldTable.convertToFieldTable(_consumeArguments))); + } + + public BasicInteraction consumeConsumerTag(final String consumerTag) + { + _consumeConsumerTag = consumerTag; + return this; + } + + public BasicInteraction consumeQueue(final String queueName) + { + _consumeQueueName = queueName; + return this; + } + + public Interaction ack() throws Exception + { + return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, _ackMultiple)); + } + + public BasicInteraction ackDeliveryTag(final long deliveryTag) + { + _ackDeliveryTag = deliveryTag; + return this; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java new file mode 100644 index 0000000..51d4426 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenBody; + +public class ChannelInteraction +{ + private Interaction _interaction; + + public ChannelInteraction(final Interaction interaction) + { + _interaction = interaction; + } + + public Interaction open() throws Exception + { + return _interaction.sendPerformative(new ChannelOpenBody()); + } + + public Interaction close() throws Exception + { + return _interaction.sendPerformative(new ChannelCloseBody(200, AMQShortString.valueOf(""), 0, 0)); + } + + public Interaction flow(final boolean active) throws Exception + { + return _interaction.sendPerformative(new ChannelFlowBody(active)); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java new file mode 100644 index 0000000..2bd9d27 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java @@ -0,0 +1,323 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.nio.ByteBuffer; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.ProtocolVersion; +import org.apache.qpid.server.protocol.v0_8.AMQDecoder; +import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException; +import org.apache.qpid.server.protocol.v0_8.transport.*; + +public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>> +{ + private QpidByteBuffer _incompleteBuffer; + + /** + * Creates a new AMQP decoder. + * + * @param methodProcessor method processor + */ + public ClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor) + { + super(false, methodProcessor); + } + + public void decodeBuffer(ByteBuffer incomingBuffer) throws AMQFrameDecodingException, AMQProtocolVersionException + { + if (_incompleteBuffer == null) + { + QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(incomingBuffer); + final int required = decode(qpidByteBuffer); + if (required != 0) + { + _incompleteBuffer = QpidByteBuffer.allocate(qpidByteBuffer.remaining() + required); + _incompleteBuffer.put(qpidByteBuffer); + } + qpidByteBuffer.dispose(); + } + else + { + if (incomingBuffer.remaining() < _incompleteBuffer.remaining()) + { + _incompleteBuffer.put(incomingBuffer); + } + else + { + _incompleteBuffer.flip(); + final QpidByteBuffer aggregatedBuffer = + QpidByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining()); + aggregatedBuffer.put(_incompleteBuffer); + aggregatedBuffer.put(incomingBuffer); + aggregatedBuffer.flip(); + final int required = decode(aggregatedBuffer); + + _incompleteBuffer.dispose(); + if (required != 0) + { + _incompleteBuffer = QpidByteBuffer.allocate(aggregatedBuffer.remaining() + required); + _incompleteBuffer.put(aggregatedBuffer); + } + else + { + _incompleteBuffer = null; + } + aggregatedBuffer.dispose(); + } + } + // post-condition: assert(!incomingBuffer.hasRemaining()); + } + + @Override + protected void processMethod(int channelId, + QpidByteBuffer in) + throws AMQFrameDecodingException + { + ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor(); + ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId); + final int classAndMethod = in.getInt(); + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + methodProcessor.setCurrentMethod(classId, methodId); + try + { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000a: + ConnectionStartBody.process(in, methodProcessor); + break; + case 0x000a0014: + ConnectionSecureBody.process(in, methodProcessor); + break; + case 0x000a001e: + ConnectionTuneBody.process(in, methodProcessor); + break; + case 0x000a0029: + ConnectionOpenOkBody.process(in, methodProcessor); + break; + case 0x000a002a: + ConnectionRedirectBody.process(in, methodProcessor); + break; + case 0x000a0032: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8)) + { + ConnectionRedirectBody.process(in, methodProcessor); + } + else + { + ConnectionCloseBody.process(in, methodProcessor); + } + break; + case 0x000a0033: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + methodProcessor.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8)) + { + ConnectionCloseBody.process(in, methodProcessor); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + case 0x000a003d: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8)) + { + methodProcessor.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + + // CHANNEL_CLASS: + + case 0x0014000b: + ChannelOpenOkBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor); + break; + case 0x00140014: + ChannelFlowBody.process(in, channelMethodProcessor); + break; + case 0x00140015: + ChannelFlowOkBody.process(in, channelMethodProcessor); + break; + case 0x0014001e: + ChannelAlertBody.process(in, channelMethodProcessor); + break; + case 0x00140028: + ChannelCloseBody.process(in, channelMethodProcessor); + break; + case 0x00140029: + channelMethodProcessor.receiveChannelCloseOk(); + break; + + // ACCESS_CLASS: + + case 0x001e000b: + AccessRequestOkBody.process(in, channelMethodProcessor); + break; + + // EXCHANGE_CLASS: + + case 0x0028000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveExchangeDeclareOk(); + } + break; + case 0x00280015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveExchangeDeleteOk(); + } + break; + case 0x00280017: + ExchangeBoundOkBody.process(in, channelMethodProcessor); + break; + + + // QUEUE_CLASS: + + case 0x0032000b: + QueueDeclareOkBody.process(in, channelMethodProcessor); + break; + case 0x00320015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveQueueBindOk(); + } + break; + case 0x0032001f: + QueuePurgeOkBody.process(in, channelMethodProcessor); + break; + case 0x00320029: + QueueDeleteOkBody.process(in, channelMethodProcessor); + break; + case 0x00320033: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveQueueUnbindOk(); + } + break; + + + // BASIC_CLASS: + + case 0x003c000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicQosOk(); + } + break; + case 0x003c0015: + BasicConsumeOkBody.process(in, channelMethodProcessor); + break; + case 0x003c001f: + BasicCancelOkBody.process(in, channelMethodProcessor); + break; + case 0x003c0032: + BasicReturnBody.process(in, channelMethodProcessor); + break; + case 0x003c003c: + BasicDeliverBody.process(in, channelMethodProcessor); + break; + case 0x003c0047: + BasicGetOkBody.process(in, channelMethodProcessor); + break; + case 0x003c0048: + BasicGetEmptyBody.process(in, channelMethodProcessor); + break; + case 0x003c0050: + BasicAckBody.process(in, channelMethodProcessor); + break; + case 0x003c0065: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicRecoverSyncOk(); + } + break; + case 0x003c006f: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicRecoverSyncOk(); + } + break; + case 0x003c0078: + BasicNackBody.process(in, channelMethodProcessor); + break; + + // CONFIRM CLASS: + + case 0x0055000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveConfirmSelectOk(); + } + break; + + // TX_CLASS: + + case 0x005a000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxSelectOk(); + } + break; + case 0x005a0015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxCommitOk(); + } + break; + case 0x005a001f: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxRollbackOk(); + } + break; + + default: + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + + } + } + finally + { + methodProcessor.setCurrentMethod(0, 0); + } + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java new file mode 100644 index 0000000..023e7fc --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody; + +public class ConnectionInteraction +{ + private final Interaction _interaction; + + private final Map<String, Object> _startOkClientProperties = new HashMap<>(); + private String _startOkMechanism; + private byte[] _startOkResponse; + private String _startOkLocale; + private int _tuneOkChannelMax; + private long _tuneOkFrameMax; + private int _tuneOkHeartbeat; + private String _openVirtualHost; + + public ConnectionInteraction(final Interaction interaction) + { + _interaction = interaction; + } + + + public ConnectionInteraction startOkMechanism(final String startOkMechanism) + { + _startOkMechanism = startOkMechanism; + return this; + } + + + public Interaction startOk() throws Exception + { + return _interaction.sendPerformative(new ConnectionStartOkBody(FieldTable.convertToFieldTable(_startOkClientProperties), + AMQShortString.valueOf(_startOkMechanism), + _startOkResponse, + AMQShortString.valueOf(_startOkLocale))); + } + + public ConnectionInteraction tuneOkChannelMax(final int channelMax) + { + _tuneOkChannelMax = channelMax; + return this; + } + + public ConnectionInteraction tuneOkFrameMax(final long frameMax) + { + _tuneOkFrameMax = frameMax; + return this; + } + + public ConnectionInteraction tuneOkHeartbeat(final int heartbeat) + { + _tuneOkHeartbeat = heartbeat; + return this; + } + + public Interaction tuneOk() throws Exception + { + return _interaction.sendPerformative(new ConnectionTuneOkBody(_tuneOkChannelMax, + _tuneOkFrameMax, + _tuneOkHeartbeat)); + } + + public ConnectionInteraction openVirtualHost(String virtualHost) + { + _openVirtualHost = virtualHost; + return this; + } + + public Interaction open() throws Exception + { + return _interaction.sendPerformative(new ConnectionOpenBody(AMQShortString.valueOf(_openVirtualHost), + null, + false)); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java new file mode 100644 index 0000000..499fe72 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.ProtocolVersion; +import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; +import org.apache.qpid.server.protocol.v0_8.transport.FrameCreatingMethodProcessor; +import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation; +import org.apache.qpid.server.transport.ByteBufferSender; +import org.apache.qpid.tests.protocol.HeaderResponse; +import org.apache.qpid.tests.protocol.InputDecoder; +import org.apache.qpid.tests.protocol.Response; + +public class FrameDecoder implements InputDecoder +{ + private final ClientDecoder _clientDecoder; + private final FrameCreatingMethodProcessor _methodProcessor; + + FrameDecoder(ProtocolVersion protocolVersion) + { + _methodProcessor = new FrameCreatingMethodProcessor(protocolVersion); + _clientDecoder = new ClientDecoder(_methodProcessor); + } + + @Override + public Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception + { + _clientDecoder.decodeBuffer(inputBuffer); + + List<AMQDataBlock> receivedFrames = new ArrayList<>(_methodProcessor.getProcessedMethods()); + List<Response<?>> result = new ArrayList<>(); + + for (AMQDataBlock frame : receivedFrames) + { + if (frame instanceof AMQFrame) + { + AMQFrame amqFrame = (AMQFrame) frame; + result.add(new PerformativeResponse(amqFrame.getChannel(), amqFrame.getSize(), amqFrame.getBodyFrame())); + } + else if (frame instanceof ProtocolInitiation) + { + byte[] data = new byte[(int) frame.getSize()]; + frame.writePayload(new ByteBufferSender() + { + @Override + public boolean isDirectBufferPreferred() + { + return false; + } + + @Override + public void send(final QpidByteBuffer msg) + { + msg.copyTo(data); + } + + @Override + public void flush() + { + + } + + @Override + public void close() + { + + } + }); + + result.add(new HeaderResponse(data)); + } + else + { + throw new IllegalArgumentException(String.format("Unexpected data block received %s", frame)); + } + } + _methodProcessor.getProcessedMethods().removeAll(receivedFrames); + return result; + } + + ProtocolVersion getVersion() + { + return _methodProcessor.getProtocolVersion(); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java new file mode 100644 index 0000000..9b471fc --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.transport.ByteBufferSender; +import org.apache.qpid.tests.protocol.OutputEncoder; + +public class FrameEncoder implements OutputEncoder +{ + + @Override + public ByteBuffer encode(final Object msg) + { + if (msg instanceof AMQDataBlock) + { + final List<ByteBuffer>buffers = new ArrayList<>(); + ((AMQDataBlock)msg).writePayload(new ByteBufferSender() + { + @Override + public boolean isDirectBufferPreferred() + { + return false; + } + + @Override + public void send(final QpidByteBuffer msg) + { + byte[] data = new byte[msg.remaining()]; + msg.get(data); + buffers.add(ByteBuffer.wrap(data)); + } + + @Override + public void flush() + { + } + + @Override + public void close() + { + + } + }); + int remaining = 0; + for (ByteBuffer byteBuffer: buffers) + { + remaining += byteBuffer.remaining(); + } + ByteBuffer result = ByteBuffer.allocate(remaining); + for (ByteBuffer byteBuffer: buffers) + { + result.put(byteBuffer); + } + result.flip(); + return result; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java new file mode 100644 index 0000000..52cd7a0 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.net.InetSocketAddress; + +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.protocol.ProtocolVersion; + + +public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport +{ + private final byte[] _protocolHeader; + private ProtocolVersion _protocolVersion; + + public FrameTransport(final InetSocketAddress brokerAddress) + { + this(brokerAddress, Protocol.AMQP_0_9_1); + } + public FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol) + { + super(brokerAddress, new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder()); + _protocolVersion = getProtocolVersion(protocol); + byte[] protocolHeader = null; + for(ProtocolEngineCreator installedEngine : (new QpidServiceLoader()).instancesOf(ProtocolEngineCreator.class)) + { + if (installedEngine.getVersion() == protocol) + { + protocolHeader = installedEngine.getHeaderIdentifier(); + } + } + + if (protocolHeader == null) + { + throw new IllegalArgumentException(String.format("Unsupported protocol %s", protocol)); + } + _protocolHeader = protocolHeader; + } + + @Override + public FrameTransport connect() + { + super.connect(); + return this; + } + + public Interaction newInteraction() + { + return new Interaction(this); + } + + public byte[] getProtocolHeader() + { + return _protocolHeader; + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public static ProtocolVersion getProtocolVersion(Protocol protocol) + { + final ProtocolVersion protocolVersion; + switch (protocol) + { + case AMQP_0_8: + protocolVersion = ProtocolVersion.v0_8; + break; + case AMQP_0_9_1: + protocolVersion = ProtocolVersion.v0_91; + break; + case AMQP_0_9: + protocolVersion = ProtocolVersion.v0_9; + break; + default: + throw new IllegalArgumentException(String.format("Unsupported protocol %s", protocol)); + } + return protocolVersion; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java new file mode 100644 index 0000000..0b62770 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; +import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody; + +public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction> +{ + + private int _channelId; + private int _maximumPayloadSize = 512; + + Interaction(final FrameTransport transport) + { + super(transport); + } + + @Override + protected byte[] getProtocolHeader() + { + return getTransport().getProtocolHeader(); + } + + @Override + protected Interaction getInteraction() + { + return this; + } + + public Interaction sendPerformative(final AMQBody amqBody) throws Exception + { + return sendPerformative(getChannelId(), amqBody); + } + + public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception + { + final AMQFrame frameBody = new AMQFrame(channel, amqBody); + sendPerformativeAndChainFuture(frameBody, false); + return this; + } + + public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception + { + sendPerformativeAndChainFuture(dataBlock, false); + return this; + } + + public Interaction openAnonymousConnection() throws Exception + { + return this.negotiateProtocol().consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS").startOk().consumeResponse(ConnectionTuneBody.class) + .connection().tuneOk() + .connection().open().consumeResponse(ConnectionOpenOkBody.class); + + } + + public ConnectionInteraction connection() + { + return new ConnectionInteraction(this); + } + + public ChannelInteraction channel() + { + return new ChannelInteraction(this); + } + + public QueueInteraction queue() + { + return new QueueInteraction(this); + } + + public int getChannelId() + { + return _channelId; + } + + public int getMaximumFrameSize() + { + return _maximumPayloadSize; + } + + public BasicInteraction basic() + { + return new BasicInteraction(this); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java new file mode 100644 index 0000000..66871e9 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.tests.protocol.v0_8; + +import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; +import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; +import org.apache.qpid.tests.protocol.Response; + +public class PerformativeResponse implements Response<AMQBody> +{ + private final int _channel; + private final long _size; + private final AMQBody _body; + + public PerformativeResponse(int channel, long size, final AMQBody body) + { + _channel = channel; + _size = size; + _body = body; + } + + @Override + public AMQBody getBody() + { + return _body; + } + + @Override + public String toString() + { + return "PerformativeResponse{" + + "_channel=" + _channel + + ", _size=" + _size + + ", _body=" + _body + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java new file mode 100644 index 0000000..6e86385 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody; + +public class QueueInteraction +{ + private Interaction _interaction; + private String _declareName; + private boolean _declarePassive; + private boolean _declareDurable; + private boolean _declareExclusive; + private boolean _declareAutoDelete; + private boolean _declareNowait; + private Map<String, Object> _declareArguments = new HashMap<>(); + + public QueueInteraction(final Interaction interaction) + { + _interaction = interaction; + } + + public QueueInteraction declareName(String name) + { + _declareName = name; + return this; + } + + public Interaction declare() throws Exception + { + return _interaction.sendPerformative(new QueueDeclareBody(0, + AMQShortString.valueOf(_declareName), + _declarePassive, + _declareDurable, + _declareExclusive, + _declareAutoDelete, + _declareNowait, + FieldTable.convertToFieldTable(_declareArguments))); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json b/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json new file mode 100644 index 0000000..d3738c9 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +{ + "name" : "${broker.name}", + "modelVersion" : "7.0", + "authenticationproviders" : [ { + "name" : "anon", + "type" : "Anonymous" + }, { + "name" : "plain", + "type" : "Plain", + "secureOnlyMechanisms" : [], + "users" : [ { + "name" : "admin", + "type" : "managed", + "password" : "admin" + }, { + "name" : "guest", + "type" : "managed", + "password" : "guest" + } ] + } ], + "ports" : [ { + "name" : "AMQP", + "type" : "AMQP", + "authenticationProvider" : "plain", + "port" : "0", + "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ], + "virtualhostaliases" : [ { + "name" : "defaultAlias", + "type" : "defaultAlias" + }, { + "name" : "hostnameAlias", + "type" : "hostnameAlias" + }, { + "name" : "nameAlias", + "type" : "nameAlias" + } ] + }, { + "name" : "ANONYMOUS_AMQP", + "type" : "AMQP", + "authenticationProvider" : "anon", + "port" : "0", + "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ], + "virtualhostaliases" : [ { + "name" : "defaultAlias", + "type" : "defaultAlias", + "durable" : true + }, { + "name" : "hostnameAlias", + "type" : "hostnameAlias", + "durable" : true + }, { + "name" : "nameAlias", + "type" : "nameAlias", + "durable" : true + } ] + } ], + "virtualhostnodes" : [] +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java new file mode 100644 index 0000000..eb51925 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class BasicTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); + } + + @Test + @SpecificationTest(section = "1.8.3.7", description = "publish a message") + public void publishMessage() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.openAnonymousConnection() + .channel().open().consumeResponse(ChannelOpenOkBody.class) + .basic().contentHeaderPropertiesContentType("text/plain") + .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue")) + .contentHeaderPropertiesDeliveryMode((byte)1) + .contentHeaderPropertiesPriority((byte)1) + .publishExchange("") + .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME) + .content("Test") + .publishMessage() + .channel().close() + .consumeResponse(ChannelCloseOkBody.class); + + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); + } + } + + + @Test + @SpecificationTest(section = "1.8.3.3", description = " start a queue consumer") + public void consumeMessage() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + String messageContent = "Test"; + String consumerTag = "A"; + String queueName = BrokerAdmin.TEST_QUEUE_NAME; + Map<String, Object> messageHeaders = Collections.singletonMap("test", "testValue"); + String messageContentType = "text/plain"; + byte deliveryMode = (byte) 1; + byte priority = (byte) 2; + interaction.openAnonymousConnection() + .channel().open().consumeResponse(ChannelOpenOkBody.class) + .basic().qosPrefetchCount(1).qos().consumeResponse(BasicQosOkBody.class) + .basic().consumeConsumerTag(consumerTag) + .consumeQueue(queueName) + .consume().consumeResponse(BasicConsumeOkBody.class) + .channel().flow(true).consumeResponse(ChannelFlowOkBody.class) + .basic().contentHeaderPropertiesContentType(messageContentType) + .contentHeaderPropertiesHeaders(messageHeaders) + .contentHeaderPropertiesDeliveryMode(deliveryMode) + .contentHeaderPropertiesPriority(priority) + .publishExchange("") + .publishRoutingKey(queueName) + .content(messageContent) + .publishMessage() + .consumeResponse(BasicDeliverBody.class); + + BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class); + assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag)))); + assertThat(delivery.getConsumerTag(), is(notNullValue())); + assertThat(delivery.getRedelivered(), is(equalTo(false))); + assertThat(delivery.getExchange(), is(nullValue())); + assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(queueName)))); + + ContentHeaderBody header = + interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class); + + assertThat(header.getBodySize(), is(equalTo((long)messageContent.length()))); + BasicContentHeaderProperties properties = header.getProperties(); + Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders)))); + assertThat(properties.getContentTypeAsString(), is(equalTo(messageContentType))); + assertThat(properties.getPriority(), is(equalTo(priority))); + assertThat(properties.getDeliveryMode(), is(equalTo(deliveryMode))); + + ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class); + + QpidByteBuffer payload = content.getPayload(); + byte[] contentData = new byte[payload.remaining()]; + payload.get(contentData); + payload.dispose(); + String receivedContent = new String(contentData, StandardCharsets.UTF_8); + + assertThat(receivedContent, is(equalTo(messageContent))); + assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1))); + + interaction.basic().ackDeliveryTag(delivery.getDeliveryTag()) + .ack() + .channel().close().consumeResponse(ChannelCloseOkBody.class); + assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(0))); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java new file mode 100644 index 0000000..4ca4ae8 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import java.net.InetSocketAddress; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class ChannelTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation") + public void channelOpen() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.openAnonymousConnection() + .channel().open().consumeResponse(ChannelOpenOkBody.class); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java new file mode 100644 index 0000000..1fbbf2a --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +import java.net.InetSocketAddress; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class ConnectionTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation") + public void connectionStart() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionStartBody response = + interaction.negotiateProtocol().consumeResponse().getLatestResponse(ConnectionStartBody.class); + + assertThat(response.getVersionMajor(), is(equalTo((short)transport.getProtocolVersion().getMajorVersion()))); + assertThat(response.getVersionMinor(), is(equalTo((short)transport.getProtocolVersion().getActualMinorVersion()))); + } + } + + @Test + @SpecificationTest(section = "1.4.2.2", description = "select security mechanism and locale") + public void connectionStartOk() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse(); + + interaction.getLatestResponse(ConnectionTuneBody.class); + } + } + + @Test + @SpecificationTest(section = "1.4.2.5", description = "select security mechanism and locale") + public void connectionTuneOkAndOpen() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + interaction.connection().tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(response.getFrameMax()) + .tuneOkHeartbeat(response.getHeartbeat()) + .tuneOk() + .connection().open() + .consumeResponse().getLatestResponse(ConnectionOpenOkBody.class); + } + } + + @Test + @SpecificationTest(section = "1.4.2.5", description = "[...] the minimum negotiated value for frame-max is also" + + " frame-min-size [4096].") + public void tooSmallFrameSize() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + interaction.connection().tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(1024) + .tuneOkHeartbeat(response.getHeartbeat()) + .tuneOk() + .consumeResponse().getLatestResponse(ConnectionCloseBody.class); + } + } + + @Test + @SpecificationTest(section = "1.4.2.5.2.", description = "If the client specifies a frame max that is higher than" + + " the value provided by the server, the server MUST" + + " close the connection without attempting a negotiated" + + " close. The server may report the error in some fashion" + + " to assist implementors.") + public void tooLargeFrameSize() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + interaction.connection().tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(Long.MAX_VALUE) + .tuneOkHeartbeat(response.getHeartbeat()) + .tuneOk() + .consumeResponse().getLatestResponse(ConnectionCloseBody.class); + } + } + + @Test + @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK" + + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK") + public void authenticationBypassBySendingTuneOk() throws Exception + { + final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class) + .connection().tuneOk() + .consumeResponse().getLatestResponse(ConnectionCloseBody.class); + } + } + + + @Test + @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK" + + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK") + public void authenticationBypassBySendingOpen() throws Exception + { + final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class) + .connection().open() + .consumeResponse().getLatestResponse(ConnectionCloseBody.class); + } + } + + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java new file mode 100644 index 0000000..920ea73 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.net.InetSocketAddress; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class QueueTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation") + public void queueDeclare() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + final String queueName = "testQueue"; + QueueDeclareOkBody response = interaction.openAnonymousConnection() + .channel().open().consumeResponse(ChannelOpenOkBody.class) + .queue().declareName(queueName).declare() + .consumeResponse().getLatestResponse(QueueDeclareOkBody.class); + + assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(queueName)))); + assertThat(response.getMessageCount(), is(equalTo(0L))); + assertThat(response.getConsumerCount(), is(equalTo(0L))); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org