Repository: qpid-broker-j Updated Branches: refs/heads/master e4598dcd6 -> 9713e49a0
QPID-7817: [WebSocket] Fix defect in buffer handling Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/9713e49a Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/9713e49a Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/9713e49a Branch: refs/heads/master Commit: 9713e49a0b12fa4a544b2ff92222446fd74416e2 Parents: e4598dc Author: Keith Wall <[email protected]> Authored: Fri Jun 16 07:38:33 2017 +0100 Committer: Keith Wall <[email protected]> Committed: Fri Jun 16 07:39:08 2017 +0100 ---------------------------------------------------------------------- .../transport/websocket/WebSocketProvider.java | 48 +++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9713e49a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java ---------------------------------------------------------------------- diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 82ee3eb..76b3fbd 100644 --- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -363,6 +363,7 @@ class WebSocketProvider implements AcceptingTransport private volatile QpidByteBuffer _netInputBuffer; private volatile MultiVersionProtocolEngine _protocolEngine; private volatile ConnectionWrapper _connectionWrapper; + private volatile boolean _unexpectedByteBufferSizeReported; AmqpWebSocket() { @@ -422,7 +423,7 @@ class WebSocketProvider implements AcceptingTransport _netInputBuffer.flip(); _protocolEngine.received(_netInputBuffer); _connectionWrapper.doWrite(); - _netInputBuffer.compact(); + restoreApplicationBufferForWrite(); } while(remaining > 0); @@ -439,6 +440,51 @@ class WebSocketProvider implements AcceptingTransport _idleTimeoutChecker.wakeup(); } + private void restoreApplicationBufferForWrite() + { + QpidByteBuffer oldNetInputBuffer = _netInputBuffer; + int unprocessedDataLength = _netInputBuffer.remaining(); + + _netInputBuffer.limit(_netInputBuffer.capacity()); + _netInputBuffer = oldNetInputBuffer.slice(); + _netInputBuffer.limit(unprocessedDataLength); + oldNetInputBuffer.dispose(); + if (_netInputBuffer.limit() != _netInputBuffer.capacity()) + { + _netInputBuffer.position(_netInputBuffer.limit()); + _netInputBuffer.limit(_netInputBuffer.capacity()); + } + else + { + QpidByteBuffer currentBuffer = _netInputBuffer; + int newBufSize; + + if (currentBuffer.capacity() < _broker.getNetworkBufferSize()) + { + newBufSize = _broker.getNetworkBufferSize(); + } + else + { + newBufSize = currentBuffer.capacity() + _broker.getNetworkBufferSize(); + reportUnexpectedByteBufferSizeUsage(); + } + + _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize); + _netInputBuffer.put(currentBuffer); + currentBuffer.dispose(); + } + } + + private void reportUnexpectedByteBufferSizeUsage() + { + if (!_unexpectedByteBufferSizeReported) + { + LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.", + _broker.getNetworkBufferSize(), this.toString()); + _unexpectedByteBufferSizeReported = true; + } + } + /** AMQP frames MUST be sent as binary data payloads of WebSocket messages.*/ @OnWebSocketMessage @SuppressWarnings("unused") public void onWebSocketText(Session sess, String text) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
