Repository: qpid-broker-j Updated Branches: refs/heads/master f03a3c460 -> b36c7180f
QPID-7820: [Java Broker] [Protocol Tests] Extend protocol test framework to test AMQP 1.0 websocket too Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b36c7180 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b36c7180 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b36c7180 Branch: refs/heads/master Commit: b36c7180f0d90a332f065e9a348e8dc0ae1972d4 Parents: f03a3c4 Author: Keith Wall <[email protected]> Authored: Sat Jun 10 17:36:35 2017 +0100 Committer: Keith Wall <[email protected]> Committed: Sat Jun 10 18:24:21 2017 +0100 ---------------------------------------------------------------------- .../transport/websocket/WebSocketProvider.java | 5 +- pom.xml | 5 + systests/protocol-tests-amqp-1-0/pom.xml | 9 + .../qpid/tests/protocol/v1_0/BrokerAdmin.java | 1 + .../tests/protocol/v1_0/FrameTransport.java | 65 +++--- .../apache/qpid/tests/protocol/v1_0/Utils.java | 4 +- .../websocket/WebSocketFrameTransport.java | 224 +++++++++++++++++++ .../main/resources/config-protocol-tests.json | 20 ++ .../bindmapjms/TemporaryDestinationTest.java | 4 +- .../soleconn/CloseExistingPolicy.java | 14 +- .../v1_0/extensions/soleconn/MixedPolicy.java | 10 +- .../soleconn/RefuseConnectionPolicy.java | 18 +- .../extensions/websocket/WebSocketTest.java | 114 ++++++++++ .../v1_0/messaging/DeleteOnCloseTest.java | 8 +- .../protocol/v1_0/messaging/TransferTest.java | 12 +- .../v1_0/transport/ProtocolHeaderTest.java | 19 +- .../v1_0/transport/connection/OpenTest.java | 8 +- .../v1_0/transport/link/AttachTest.java | 6 +- .../protocol/v1_0/transport/link/FlowTest.java | 8 +- .../v1_0/transport/security/sasl/SaslTest.java | 16 +- .../v1_0/transport/session/BeginTest.java | 6 +- 21 files changed, 468 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java ---------------------------------------------------------------------- diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index f1b5e2d..7784dac 100644 --- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -429,7 +429,10 @@ class WebSocketProvider implements AcceptingTransport @Override public void onWebSocketClose(final int statusCode, final String reason) { - _protocolEngine.closed(); + if (_protocolEngine != null) + { + _protocolEngine.closed(); + } _activeConnections.remove(_connectionWrapper); _idleTimeoutChecker.wakeup(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index da1fe62..45f6fa1 100644 --- a/pom.xml +++ b/pom.xml @@ -371,6 +371,11 @@ <version>${netty-version}</version> </dependency> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + <version>${netty-version}</version> + </dependency> + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>${hamcrest-version}</version> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/pom.xml ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml index ef33da3..9aa5295 100644 --- a/systests/protocol-tests-amqp-1-0/pom.xml +++ b/systests/protocol-tests-amqp-1-0/pom.xml @@ -43,6 +43,11 @@ </dependency> <dependency> <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-websocket</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker-codegen</artifactId> <version>${project.version}</version> </dependency> @@ -111,6 +116,10 @@ </dependency> <dependency> <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> <artifactId>netty-handler</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java index e4efc76..a263b2d 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java @@ -48,6 +48,7 @@ public interface BrokerAdmin extends Pluggable enum PortType { ANONYMOUS_AMQP, + ANONYMOUS_AMQPWS, AMQP } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java index eda903a..16eb3d8 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java @@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import java.net.InetSocketAddress; -import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -38,6 +37,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import io.netty.bootstrap.Bootstrap; @@ -47,6 +47,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -78,10 +79,12 @@ public class FrameTransport implements AutoCloseable private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse(); - private final Channel _channel; private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100); private final EventLoopGroup _workerGroup; + private final InetSocketAddress _brokerAddress; + private final boolean _isSasl; + private Channel _channel; private volatile boolean _channelClosedSeen = false; private int _amqpConnectionId; private short _amqpChannelId; @@ -93,8 +96,18 @@ public class FrameTransport implements AutoCloseable public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl) { + _brokerAddress = brokerAddress; + _isSasl = isSasl; _workerGroup = new NioEventLoopGroup(); + } + + public InetSocketAddress getBrokerAddress() + { + return _brokerAddress; + } + public FrameTransport connect() + { try { Bootstrap b = new Bootstrap(); @@ -106,11 +119,12 @@ public class FrameTransport implements AutoCloseable @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new InputHandler(_queue, isSasl)).addLast(new OutputHandler()); + ChannelPipeline pipeline = ch.pipeline(); + buildInputOutputPipeline(pipeline); } }); - _channel = b.connect(brokerAddress).sync().channel(); + _channel = b.connect(_brokerAddress).sync().channel(); _channel.closeFuture().addListener(future -> { _channelClosedSeen = true; @@ -121,6 +135,12 @@ public class FrameTransport implements AutoCloseable { throw new RuntimeException(e); } + return this; + } + + protected void buildInputOutputPipeline(final ChannelPipeline pipeline) + { + pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler()); } @Override @@ -128,8 +148,12 @@ public class FrameTransport implements AutoCloseable { try { - _channel.disconnect().sync(); - _channel.close().sync(); + if (_channel != null) + { + _channel.disconnect().sync(); + _channel.close().sync(); + _channel = null; + } } finally { @@ -140,6 +164,7 @@ public class FrameTransport implements AutoCloseable public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception { + Preconditions.checkState(_channel != null, "Not connected"); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes(bytes); ChannelFuture channelFuture = _channel.writeAndFlush(buffer); @@ -149,6 +174,7 @@ public class FrameTransport implements AutoCloseable public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception { + Preconditions.checkState(_channel != null, "Not connected"); final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null; TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload); ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame); @@ -372,7 +398,6 @@ public class FrameTransport implements AutoCloseable assertThat(_channelClosedSeen, is(true)); } - private int getConnectionId() { if (_amqpConnectionId == 0) @@ -386,32 +411,6 @@ public class FrameTransport implements AutoCloseable return _amqpConnectionId; } - public void assertChannelClosed() - { - try - { - ChannelFuture channelFuture = _channel.write(new byte[]{0}); - channelFuture.sync(); - throw new IllegalStateException( - "Expecting the channel to be already closed by, but it was able to take more input."); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - catch (Exception e) - { - if (e instanceof ClosedChannelException) - { - // PASS - } - else - { - throw new IllegalStateException("Unexpected exception", e); - } - } - } - private static class ChannelClosedResponse implements Response { @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java index a9491e3..b892589 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java @@ -43,7 +43,7 @@ public class Utils public static boolean doesNodeExist(final InetSocketAddress brokerAddress, final String nodeAddress) throws Exception { - try (FrameTransport transport = new FrameTransport(brokerAddress)) + try (FrameTransport transport = new FrameTransport(brokerAddress).connect()) { transport.doBeginSession(); @@ -86,7 +86,7 @@ public class Utils public static Object receiveMessage(final InetSocketAddress brokerAddress, final String queueName) throws Exception { - try (FrameTransport transport = new FrameTransport(brokerAddress)) + try (FrameTransport transport = new FrameTransport(brokerAddress).connect()) { transport.doAttachReceivingLink(queueName); Flow flow = new Flow(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java new file mode 100644 index 0000000..b5ccd08 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java @@ -0,0 +1,224 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tests.protocol.v1_0.extensions.websocket; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; + +public class WebSocketFrameTransport extends FrameTransport +{ + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketFrameTransport.class); + + private WebSocketFramingOutputHandler _webSocketFramingOutputHandler; + private WebSocketDeframingInputHandler _webSocketDeframingInputHandler; + private WebSocketClientHandler _webSocketClientHandler; + + public WebSocketFrameTransport(final InetSocketAddress addr) + { + super(addr); + } + + @Override + protected void buildInputOutputPipeline(final ChannelPipeline pipeline) + { + URI uri = URI.create(String.format("tcp://%s:%d/", + getBrokerAddress().getHostString(), + getBrokerAddress().getPort())); + _webSocketClientHandler = new WebSocketClientHandler( + WebSocketClientHandshakerFactory.newHandshaker( + uri, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()), uri); + _webSocketFramingOutputHandler = new WebSocketFramingOutputHandler(); + _webSocketDeframingInputHandler = new WebSocketDeframingInputHandler(); + + pipeline.addLast(new HttpClientCodec()); + pipeline.addLast(new HttpObjectAggregator(65536)); + pipeline.addLast(_webSocketClientHandler); + pipeline.addLast(_webSocketFramingOutputHandler); + pipeline.addLast(_webSocketDeframingInputHandler); + super.buildInputOutputPipeline(pipeline); + } + + @Override + public WebSocketFrameTransport connect() + { + super.connect(); + _webSocketClientHandler.handshakeFuture().syncUninterruptibly(); + return this; + } + + WebSocketFrameTransport splitAmqpFrames() + { + _webSocketFramingOutputHandler.splitAmqpFrames(); + return this; + } + + private class WebSocketFramingOutputHandler extends ChannelOutboundHandlerAdapter + { + private boolean _splitFrames; + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception + { + if (msg instanceof ByteBuf) + { + final ByteBuf buf = ((ByteBuf) msg); + if (_splitFrames) + { + buf.forEachByte(b -> + { + ByteBuf byteBuf = Unpooled.copiedBuffer(new byte[] {b}); + BinaryWebSocketFrame frame = new BinaryWebSocketFrame(byteBuf); + ctx.write(frame, promise); + return false; + }); + } + else + { + BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg); + ctx.write(frame, promise); + } + } + else + { + ctx.write(msg, promise); + } + } + + void splitAmqpFrames() + { + _splitFrames = true; + } + } + + private class WebSocketDeframingInputHandler extends ChannelInboundHandlerAdapter + { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + { + if (msg instanceof WebSocketFrame) + { + WebSocketFrame frame = (WebSocketFrame) msg; + ctx.fireChannelRead(frame.content()); + } + else + { + ctx.fireChannelRead(msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) + { + ctx.flush(); + } + } + + public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> + { + + private final WebSocketClientHandshaker _handshaker; + private ChannelPromise _handshakeFuture; + + WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final URI uri) + { + _handshaker = handshaker; + } + + ChannelFuture handshakeFuture() + { + return _handshakeFuture; + } + + @Override + public void handlerAdded(final ChannelHandlerContext ctx) + { + _handshakeFuture = ctx.newPromise(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) + { + _handshaker.handshake(ctx.channel()); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) + { + final Channel ch = ctx.channel(); + if (!_handshaker.isHandshakeComplete()) + { + // web socket client connected + _handshaker.finishHandshake(ch, (FullHttpResponse) msg); + _handshakeFuture.setSuccess(); + return; + } + + if (msg instanceof FullHttpResponse) + { + final FullHttpResponse response = (FullHttpResponse) msg; + throw new IllegalStateException(String.format("Unexpected FullHttpResponse (getStatus=%s, content=%s)", + response.content().toString(StandardCharsets.UTF_8), response.status())); + } + + WebSocketFrame frame = (WebSocketFrame) msg; + ctx.fireChannelRead(frame.retain()); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) + { + LOGGER.error("exceptionCaught", cause); + + if (!_handshakeFuture.isDone()) + { + _handshakeFuture.setFailure(cause); + } + ctx.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json index 1aaa210..764ff89 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json +++ b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json @@ -73,6 +73,26 @@ "type" : "nameAlias", "durable" : true } ] + }, { + "name" : "ANONYMOUS_AMQPWS", + "type" : "AMQP", + "authenticationProvider" : "anon", + "port" : "0", + "transports" : ["WS"], + "protocols" : [ "AMQP_1_0" ], + "virtualhostaliases" : [ { + "name" : "defaultAlias", + "type" : "defaultAlias", + "durable" : true + }, { + "name" : "hostnameAlias", + "type" : "hostnameAlias", + "durable" : true + }, { + "name" : "nameAlias", + "type" : "nameAlias", + "durable" : true + } ] } ], "virtualhostnodes" : [] } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java index 9f7c868..5583f2a 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java @@ -82,7 +82,7 @@ public class TemporaryDestinationTest extends ProtocolTestBase { String newTemporaryNodeAddress = null; - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doBeginSession(); @@ -124,7 +124,7 @@ public class TemporaryDestinationTest extends ProtocolTestBase transport.doCloseConnection(); } - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doBeginSession(); assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false)); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java index b250bd9..c3e5999 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java @@ -64,7 +64,7 @@ public class CloseExistingPolicy extends ProtocolTestBase @Test public void basicNegotiation() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doProtocolNegotiation(); Open open = new Open(); @@ -92,7 +92,7 @@ public class CloseExistingPolicy extends ProtocolTestBase @Test public void existingConnectionClosed() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -107,7 +107,7 @@ public class CloseExistingPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); @@ -145,7 +145,7 @@ public class CloseExistingPolicy extends ProtocolTestBase @Test public void weakDetection() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -158,7 +158,7 @@ public class CloseExistingPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); @@ -195,7 +195,7 @@ public class CloseExistingPolicy extends ProtocolTestBase @Test public void strongDetection() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -217,7 +217,7 @@ public class CloseExistingPolicy extends ProtocolTestBase is(equalTo(SoleConnectionDetectionPolicy.STRONG.getValue()))); } - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java index 857368c..0248a9f 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java @@ -56,7 +56,7 @@ public class MixedPolicy extends ProtocolTestBase @Test public void firstCloseThenRefuse() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -71,7 +71,7 @@ public class MixedPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); @@ -90,7 +90,7 @@ public class MixedPolicy extends ProtocolTestBase assertThat(response2, is(notNullValue())); assertThat(response2.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport3 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport3 = new FrameTransport(_brokerAddress).connect()) { transport3.doProtocolNegotiation(); Open open3 = new Open(); @@ -115,7 +115,7 @@ public class MixedPolicy extends ProtocolTestBase @Test public void firstRefuseThenClose() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -130,7 +130,7 @@ public class MixedPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java index d0f3f04..409cc17 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java @@ -65,7 +65,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase @Test public void basicNegotiation() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect();) { transport.doProtocolNegotiation(); Open open = new Open(); @@ -93,7 +93,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase @Test public void newConnectionRefused() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -108,7 +108,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); @@ -142,7 +142,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase @Test public void weakDetection() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -155,7 +155,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); @@ -188,7 +188,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase @Test public void strongDetection() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -210,7 +210,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase is(equalTo(SoleConnectionDetectionPolicy.STRONG.getValue()))); } - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); @@ -241,7 +241,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase @Test public void refuseIsDefault() throws Exception { - try (FrameTransport transport1 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { transport1.doProtocolNegotiation(); Open open = new Open(); @@ -255,7 +255,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase assertThat(response, is(notNullValue())); assertThat(response.getFrameBody(), is(instanceOf(Open.class))); - try (FrameTransport transport2 = new FrameTransport(_brokerAddress)) + try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { transport2.doProtocolNegotiation(); Open open2 = new Open(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java new file mode 100644 index 0000000..23026d8 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tests.protocol.v1_0.extensions.websocket; + +import static org.hamcrest.CoreMatchers.both; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertArrayEquals; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin; +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; +import org.apache.qpid.tests.protocol.v1_0.HeaderResponse; +import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase; +import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; + +public class WebSocketTest extends ProtocolTestBase +{ + @Test + @SpecificationTest(section = "2.1", description = "Opening a WebSocket Connection") + public void protocolHeader() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS); + try (FrameTransport transport = new WebSocketFrameTransport(addr).connect()) + { + byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); + transport.sendProtocolHeader(bytes); + HeaderResponse response = (HeaderResponse) transport.getNextResponse(); + assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader()); + } + } + + @Test + @SpecificationTest(section = "2.4", description = "[...] a single AMQP frame MAY be split over one or more consecutive WebSocket messages. ") + @Ignore("QPID-7817") + public void amqpFramesSplitOverManyWebSocketFrames() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS); + try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect()) + { + byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); + transport.sendProtocolHeader(bytes); + HeaderResponse response = (HeaderResponse) transport.getNextResponse(); + assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader()); + + Open open = new Open(); + open.setContainerId("testContainerId"); + transport.sendPerformative(open, UnsignedShort.valueOf((short) 0)); + Open responseOpen = transport.getNextPerformativeResponse(Open.class); + + assertThat(responseOpen.getContainerId(), is(notNullValue())); + assertThat(responseOpen.getMaxFrameSize().longValue(), + is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue())))); + assertThat(responseOpen.getChannelMax().intValue(), + is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue())))); + + transport.doCloseConnection(); + } + } + + @Test + @SpecificationTest(section = "2.1", description = "Opening a WebSocket Connection") + public void successfulOpen() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS); + try (FrameTransport transport = new WebSocketFrameTransport(addr).connect()) + { + transport.doProtocolNegotiation(); + + Open open = new Open(); + open.setContainerId("testContainerId"); + transport.sendPerformative(open, UnsignedShort.valueOf((short) 0)); + Open responseOpen = transport.getNextPerformativeResponse(Open.class); + + assertThat(responseOpen.getContainerId(), is(notNullValue())); + assertThat(responseOpen.getMaxFrameSize().longValue(), + is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue())))); + assertThat(responseOpen.getChannelMax().intValue(), + is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue())))); + + transport.doCloseConnection(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java index 2a9016d..44b2b21 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java @@ -67,7 +67,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase + "creation ceases to exist.") public void deleteOnCloseOnSource() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doBeginSession(); @@ -113,7 +113,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase + "creation ceases to exist.") public void deleteOnCloseOnTarget() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doBeginSession(); @@ -163,7 +163,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase + "creation ceases to exist.") public void doesNotDeleteOnDetach() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doBeginSession(); @@ -208,7 +208,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase assumeThat(getBrokerAdmin().supportsRestart(), is(true)); final String newTempQueueAddress; - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { transport.doBeginSession(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 92781d5..c099e20 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -79,7 +79,7 @@ public class TransferTest extends ProtocolTestBase description = "Transfer without mandatory fields should result in a decoding error.") public void emptyTransfer() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final UnsignedInteger linkHandle = UnsignedInteger.ZERO; transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME); @@ -103,7 +103,7 @@ public class TransferTest extends ProtocolTestBase + "[...] and can only be omitted for continuation transfers.") public void transferWithoutDeliveryTag() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final UnsignedInteger linkHandle = UnsignedInteger.ONE; transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME); @@ -133,7 +133,7 @@ public class TransferTest extends ProtocolTestBase public void transferUnsettled() throws Exception { String sentData = "foo"; - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final UnsignedInteger linkHandle = UnsignedInteger.ZERO; transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME); @@ -165,7 +165,7 @@ public class TransferTest extends ProtocolTestBase public void transferReceiverSettleModeFirst() throws Exception { String sentData = "foo"; - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final UnsignedInteger linkHandle = UnsignedInteger.ZERO; Attach attach = new Attach(); @@ -210,7 +210,7 @@ public class TransferTest extends ProtocolTestBase public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception { String sentData = "foo"; - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final UnsignedInteger linkHandle = UnsignedInteger.ZERO; Attach attach = new Attach(); @@ -253,7 +253,7 @@ public class TransferTest extends ProtocolTestBase @SpecificationTest(section = "", description = "Pipelined message send") public void presettledPipelined() throws Exception { - try (FrameTransport transport = new FrameTransport(_brokerAddress)) + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { byte[] protocolHeader = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); Open open = new Open(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java index ed9c10f..b68ae8d 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java @@ -33,21 +33,6 @@ import org.apache.qpid.tests.protocol.v1_0.HeaderResponse; import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase; import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; - -/* - -TODO - -logging - log per test? -protocol assertions -admin factory -performative test -embedded broker per test admin impl that creates broker per test -embedded broker per class admin impl creates/destroys vhost per test -queue creation? - */ - - public class ProtocolHeaderTest extends ProtocolTestBase { @Test @@ -59,7 +44,7 @@ public class ProtocolHeaderTest extends ProtocolTestBase public void successfulHeaderExchange() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); transport.sendProtocolHeader(bytes); @@ -76,7 +61,7 @@ public class ProtocolHeaderTest extends ProtocolTestBase public void unacceptableProtocolIdSent_SaslAcceptable() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { byte[] rawHeaderBytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); byte[] expectedSaslHeaderBytes = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java index 29bbd35..d3c83b5 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java @@ -55,7 +55,7 @@ public class OpenTest extends ProtocolTestBase public void emptyOpen() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doProtocolNegotiation(); Open open = new Open(); @@ -80,7 +80,7 @@ public class OpenTest extends ProtocolTestBase public void successfulOpen() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doProtocolNegotiation(); Open open = new Open(); @@ -107,7 +107,7 @@ public class OpenTest extends ProtocolTestBase public void failOpenOnChannelNotZero() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doProtocolNegotiation(); Open open = new Open(); @@ -131,7 +131,7 @@ public class OpenTest extends ProtocolTestBase public void failOpenOnNonExistingHostname() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doProtocolNegotiation(); Open open = new Open(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java index b37dff9..c5ce03e 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java @@ -53,7 +53,7 @@ public class AttachTest extends ProtocolTestBase public void emptyAttach() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doBeginSession(); Attach attach = new Attach(); @@ -76,7 +76,7 @@ public class AttachTest extends ProtocolTestBase public void successfulAttachAsSender() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doBeginSession(); Attach attach = new Attach(); @@ -112,7 +112,7 @@ public class AttachTest extends ProtocolTestBase String queueName = "testQueue"; getBrokerAdmin().createQueue(queueName); final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { Role localRole = Role.RECEIVER; transport.doBeginSession(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java index 9b4aa9c..1071f84 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java @@ -52,7 +52,7 @@ public class FlowTest extends ProtocolTestBase { getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME); Flow flow = new Flow(); @@ -74,7 +74,7 @@ public class FlowTest extends ProtocolTestBase public void sessionEchoFlow() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doBeginSession(); Flow flow = new Flow(); @@ -102,7 +102,7 @@ public class FlowTest extends ProtocolTestBase { getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { final UnsignedInteger handle = UnsignedInteger.ONE; transport.doAttachSendingLink(handle, BrokerAdmin.TEST_QUEUE_NAME); @@ -152,7 +152,7 @@ public class FlowTest extends ProtocolTestBase { getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java index ede03a2..a84b8e9 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java @@ -66,7 +66,7 @@ public class SaslTest extends ProtocolTestBase public void saslSuccessfulAuthentication() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -99,7 +99,7 @@ public class SaslTest extends ProtocolTestBase public void saslSuccessfulAuthenticationWithChallengeResponse() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -138,7 +138,7 @@ public class SaslTest extends ProtocolTestBase public void saslUnsuccessfulAuthentication() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -168,7 +168,7 @@ public class SaslTest extends ProtocolTestBase public void unsupportedSaslMechanism() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -194,7 +194,7 @@ public class SaslTest extends ProtocolTestBase public void authenticationBypassDisallowed() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -217,7 +217,7 @@ public class SaslTest extends ProtocolTestBase public void clientSendsSaslMechanisms() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -238,7 +238,7 @@ public class SaslTest extends ProtocolTestBase public void clientSendsSaslChallenge() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); @@ -259,7 +259,7 @@ public class SaslTest extends ProtocolTestBase public void clientSendsSaslOutcome() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); - try (FrameTransport transport = new FrameTransport(addr, true)) + try (FrameTransport transport = new FrameTransport(addr, true).connect()) { transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java index d12bb0b..31a48af 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java @@ -50,7 +50,7 @@ public class BeginTest extends ProtocolTestBase public void emptyBegin() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try(FrameTransport transport = new FrameTransport(addr)) + try(FrameTransport transport = new FrameTransport(addr).connect()) { transport.doOpenConnection(); Begin begin = new Begin(); @@ -72,7 +72,7 @@ public class BeginTest extends ProtocolTestBase public void successfulBegin() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { transport.doOpenConnection(); Begin begin = new Begin(); @@ -103,7 +103,7 @@ public class BeginTest extends ProtocolTestBase public void channelMax() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); - try (FrameTransport transport = new FrameTransport(addr)) + try (FrameTransport transport = new FrameTransport(addr).connect()) { UnsignedShort channelMax = UnsignedShort.valueOf((short) 5); transport.doProtocolNegotiation(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
