Repository: qpid-broker-j Updated Branches: refs/heads/master ce077c25c -> 19d6b838e
QPID-7817: [WebSocket] Change implementation to allow an AMQP frame to cross a web-socket frame boundary Corrected the supporting protocol test too and removed the exclusion. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/19d6b838 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/19d6b838 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/19d6b838 Branch: refs/heads/master Commit: 19d6b838e5e96bcbdf6816eacbfd085697041a99 Parents: ce077c2 Author: Keith Wall <[email protected]> Authored: Mon Jun 12 07:34:23 2017 +0100 Committer: Keith Wall <[email protected]> Committed: Tue Jun 13 15:50:13 2017 +0100 ---------------------------------------------------------------------- .../protocol/v1_0/type/UnsignedShort.java | 5 +- .../transport/websocket/WebSocketProvider.java | 69 ++++++++++++++------ .../websocket/WebSocketFrameTransport.java | 47 +++++++------ .../extensions/websocket/WebSocketTest.java | 21 +++--- 4 files changed, 88 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java index c0b828f..439cd43 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java @@ -28,7 +28,6 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh private final short _underlying; private static final UnsignedShort[] cachedValues = new UnsignedShort[256]; public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) 0xffff); - static { for(short i = 0; i < 256; i++) @@ -37,6 +36,8 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh } } + public static final UnsignedShort ZERO = UnsignedShort.valueOf((short) 0); + private UnsignedShort(short underlying) { _underlying = underlying; @@ -56,7 +57,7 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh @Override public long longValue() { - return ((long) _underlying) & 0xFFFFl; + return ((long) _underlying) & 0xFFFFL; } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java ---------------------------------------------------------------------- diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 7784dac..ba66d1e 100644 --- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -50,13 +50,15 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.server.WebSocketHandler; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; @@ -88,6 +90,7 @@ class WebSocketProvider implements AcceptingTransport private final Transport _transport; private final SSLContext _sslContext; private final AmqpPort<?> _port; + private final Broker<?> _broker; private final Set<Protocol> _supported; private final Protocol _defaultSupportedProtocolReply; private final MultiVersionProtocolEngineFactory _factory; @@ -108,11 +111,12 @@ class WebSocketProvider implements AcceptingTransport _transport = transport; _sslContext = sslContext; _port = port; + _broker = ((Broker<?>) port.getParent()); _supported = supported; _defaultSupportedProtocolReply = defaultSupportedProtocolReply; _factory = new MultiVersionProtocolEngineFactory( - (Broker<?>) _port.getParent(), + _broker, _supported, _defaultSupportedProtocolReply, _port, @@ -353,16 +357,21 @@ class WebSocketProvider implements AcceptingTransport ((ServerConnector) server.getConnectors()[0]).getLocalPort(); } - public class AmqpWebSocket extends WebSocketAdapter + @WebSocket + public class AmqpWebSocket { + private volatile QpidByteBuffer _netInputBuffer; private volatile MultiVersionProtocolEngine _protocolEngine; private volatile ConnectionWrapper _connectionWrapper; - @Override - public void onWebSocketConnect(final Session session) + AmqpWebSocket() { - super.onWebSocketConnect(session); + _netInputBuffer = QpidByteBuffer.allocateDirect(_broker.getNetworkBufferSize()); + } + @OnWebSocketConnect @SuppressWarnings("unused") + public void onWebSocketConnect(final Session session) + { SocketAddress localAddress = session.getLocalAddress(); SocketAddress remoteAddress = session.getRemoteAddress(); _protocolEngine = _factory.newProtocolEngine(remoteAddress); @@ -386,12 +395,11 @@ class WebSocketProvider implements AcceptingTransport } - @Override - public void onWebSocketBinary(final byte[] data, final int offset, final int length) + @OnWebSocketMessage @SuppressWarnings("unused") + public void onWebSocketBinary(Session sess, final byte[] payload, int offset, final int len) { synchronized (_connectionWrapper) { - _protocolEngine.clearWork(); try { @@ -402,13 +410,26 @@ class WebSocketProvider implements AcceptingTransport iter.next().run(); } - for (QpidByteBuffer qpidByteBuffer : QpidByteBuffer.asQpidByteBuffers(data, offset, length)) + int lastRead; + int remaining = len; + do { - _protocolEngine.received(qpidByteBuffer); - qpidByteBuffer.dispose(); + int chunkLen = Math.min(remaining, _netInputBuffer.remaining()); + _netInputBuffer.put(payload, offset, chunkLen); + remaining =- chunkLen; + offset =+ chunkLen; + + _netInputBuffer.flip(); + _protocolEngine.received(_netInputBuffer); + _connectionWrapper.doWrite(); + _netInputBuffer.compact(); } + while(remaining > 0); - _connectionWrapper.doWrite(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read {} byte(s)", len); + } } finally { @@ -416,17 +437,17 @@ class WebSocketProvider implements AcceptingTransport } } _idleTimeoutChecker.wakeup(); - } - @Override - public void onWebSocketError(final Throwable cause) + /** AMQP frames MUST be sent as binary data payloads of WebSocket messages.*/ + @OnWebSocketMessage @SuppressWarnings("unused") + public void onWebSocketText(Session sess, String text) { - super.onWebSocketError(cause); - LOGGER.error("onWebSocketError", cause); + LOGGER.info("Unexpected websocket text message received, closing connection"); + sess.close(); } - @Override + @OnWebSocketClose @SuppressWarnings("unused") public void onWebSocketClose(final int statusCode, final String reason) { if (_protocolEngine != null) @@ -435,6 +456,7 @@ class WebSocketProvider implements AcceptingTransport } _activeConnections.remove(_connectionWrapper); _idleTimeoutChecker.wakeup(); + _netInputBuffer.dispose(); } } @@ -486,7 +508,6 @@ class WebSocketProvider implements AcceptingTransport @Override public void start() { - } @Override @@ -605,6 +626,8 @@ class WebSocketProvider implements AcceptingTransport QpidByteBuffer buf; while((buf = _buffers.poll())!= null) { + // TODO: For efficiency perhaps only coalesce sequential small buffers and let large buffers + // go alone in a binary message. This would likely avoid the memory copies of large transfer payloads size += buf.remaining(); toBeWritten.add(buf); } @@ -624,6 +647,10 @@ class WebSocketProvider implements AcceptingTransport try { _connection.getRemote().sendBytes(ByteBuffer.wrap(data)); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written {} byte(s)", data.length); + } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java index b5ccd08..00150ba 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java @@ -53,27 +53,24 @@ public class WebSocketFrameTransport extends FrameTransport { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketFrameTransport.class); - private WebSocketFramingOutputHandler _webSocketFramingOutputHandler; - private WebSocketDeframingInputHandler _webSocketDeframingInputHandler; - private WebSocketClientHandler _webSocketClientHandler; + private final WebSocketFramingOutputHandler _webSocketFramingOutputHandler = new WebSocketFramingOutputHandler(); + private final WebSocketDeframingInputHandler _webSocketDeframingInputHandler = new WebSocketDeframingInputHandler(); + private final WebSocketClientHandler _webSocketClientHandler; public WebSocketFrameTransport(final InetSocketAddress addr) { super(addr); - } - - @Override - protected void buildInputOutputPipeline(final ChannelPipeline pipeline) - { URI uri = URI.create(String.format("tcp://%s:%d/", getBrokerAddress().getHostString(), getBrokerAddress().getPort())); _webSocketClientHandler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()), uri); - _webSocketFramingOutputHandler = new WebSocketFramingOutputHandler(); - _webSocketDeframingInputHandler = new WebSocketDeframingInputHandler(); + } + @Override + protected void buildInputOutputPipeline(final ChannelPipeline pipeline) + { pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(_webSocketClientHandler); @@ -105,26 +102,36 @@ public class WebSocketFrameTransport extends FrameTransport { if (msg instanceof ByteBuf) { - final ByteBuf buf = ((ByteBuf) msg); + final ByteBuf buf = ((ByteBuf) msg).retain(); + if (_splitFrames) { - buf.forEachByte(b -> - { - ByteBuf byteBuf = Unpooled.copiedBuffer(new byte[] {b}); - BinaryWebSocketFrame frame = new BinaryWebSocketFrame(byteBuf); - ctx.write(frame, promise); - return false; - }); + while(buf.isReadable()) + { + + byte b = buf.readByte(); + BinaryWebSocketFrame frame = new BinaryWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {b})); + if (buf.isReadable()) + { + ctx.writeAndFlush(frame); + } + else + { + ctx.writeAndFlush(frame, promise); + } + } + + buf.release(); } else { BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg); - ctx.write(frame, promise); + ctx.writeAndFlush(frame, promise); } } else { - ctx.write(msg, promise); + ctx.writeAndFlush(msg, promise); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java index c0d1a52..f6d952f 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java @@ -32,7 +32,6 @@ import static org.junit.Assert.assertArrayEquals; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; -import org.junit.Ignore; import org.junit.Test; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; @@ -46,6 +45,9 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; public class WebSocketTest extends ProtocolTestBase { + + public static final byte[] AMQP_HEADER = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); + @Test @SpecificationTest(section = "2.1", description = "Opening a WebSocket Connection") public void protocolHeader() throws Exception @@ -53,29 +55,26 @@ public class WebSocketTest extends ProtocolTestBase final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS); try (FrameTransport transport = new WebSocketFrameTransport(addr).connect()) { - byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); - transport.sendProtocolHeader(bytes); + transport.sendProtocolHeader(AMQP_HEADER); HeaderResponse response = transport.getNextResponse(); - assertArrayEquals("Unexpected protocol header response", bytes, response.getBody()); + assertArrayEquals("Unexpected protocol header response", AMQP_HEADER, response.getBody()); } } @Test @SpecificationTest(section = "2.4", description = "[...] a single AMQP frame MAY be split over one or more consecutive WebSocket messages. ") - @Ignore("QPID-7817") public void amqpFramesSplitOverManyWebSocketFrames() throws Exception { final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS); try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect()) { - byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); - transport.sendProtocolHeader(bytes); - HeaderResponse response = transport.getNextResponse(); - assertArrayEquals("Unexpected protocol header response", bytes, response.getBody()); + transport.sendProtocolHeader(AMQP_HEADER); + HeaderResponse response = transport.getNextResponse(HeaderResponse.class); + assertArrayEquals("Unexpected protocol header response", AMQP_HEADER, response.getBody()); Open open = new Open(); open.setContainerId("testContainerId"); - transport.sendPerformative(open, UnsignedShort.valueOf((short) 0)); + transport.sendPerformative(open, UnsignedShort.ZERO); Open responseOpen = transport.getNextResponseBody(Open.class); assertThat(responseOpen.getContainerId(), is(notNullValue())); @@ -99,7 +98,7 @@ public class WebSocketTest extends ProtocolTestBase Open open = new Open(); open.setContainerId("testContainerId"); - transport.sendPerformative(open, UnsignedShort.valueOf((short) 0)); + transport.sendPerformative(open, UnsignedShort.ZERO); Open responseOpen = transport.getNextResponseBody(Open.class); assertThat(responseOpen.getContainerId(), is(notNullValue())); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
