Author: orudyy
Date: Fri Nov 13 15:51:51 2015
New Revision: 1714224
URL: http://svn.apache.org/viewvc?rev=1714224&view=rev
Log:
QPID-6788: Allow web socket connection to suspend message assignment on
reaching the outbound message network buffer limit
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714224&r1=1714223&r2=1714224&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Fri Nov 13 15:51:51 2015
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.servlet.ServletException;
@@ -39,7 +40,6 @@ import javax.servlet.http.HttpServletReq
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.AbstractConnector;
-import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -49,6 +49,8 @@ import org.eclipse.jetty.util.ssl.SslCon
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
@@ -67,6 +69,7 @@ import org.apache.qpid.transport.network
class WebSocketProvider implements AcceptingTransport
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketProvider.class);
public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
public static final String X509_CERTIFICATES =
"javax.servlet.request.X509Certificate";
private final Transport _transport;
@@ -76,6 +79,7 @@ class WebSocketProvider implements Accep
private final Protocol _defaultSupportedProtocolReply;
private final MultiVersionProtocolEngineFactory _factory;
private Server _server;
+ private final long _outboundMessageBufferLimit;
WebSocketProvider(final Transport transport,
final SSLContext sslContext,
@@ -88,6 +92,9 @@ class WebSocketProvider implements Accep
_port = port;
_supported = supported;
_defaultSupportedProtocolReply = defaultSupportedProtocolReply;
+
+ _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
+
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
_factory = new MultiVersionProtocolEngineFactory(
_port.getParent(Broker.class),
_supported,
@@ -236,22 +243,30 @@ class WebSocketProvider implements Accep
{
_protocolEngine.clearWork();
- _protocolEngine.setMessageAssignmentSuspended(true);
- Iterator<Runnable> iter =
_protocolEngine.processPendingIterator();
- while(iter.hasNext())
+ try
{
- iter.next().run();
- }
+ _protocolEngine.setIOThread(Thread.currentThread());
+ _protocolEngine.setMessageAssignmentSuspended(true);
+ Iterator<Runnable> iter =
_protocolEngine.processPendingIterator();
+ while(iter.hasNext())
+ {
+ iter.next().run();
+ }
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
- buffer.put(data,offset,length);
- buffer.flip();
- _protocolEngine.received(buffer);
- buffer.dispose();
+ QpidByteBuffer buffer =
QpidByteBuffer.allocateDirect(length);
+ buffer.put(data, offset, length);
+ buffer.flip();
+ _protocolEngine.received(buffer);
+ buffer.dispose();
- _connectionWrapper.doWrite();
+ _connectionWrapper.doWrite();
- _protocolEngine.setMessageAssignmentSuspended(false);
+ _protocolEngine.setMessageAssignmentSuspended(false);
+ }
+ finally
+ {
+ _protocolEngine.setIOThread(null);
+ }
}
}
@@ -300,6 +315,7 @@ class WebSocketProvider implements Accep
private final SocketAddress _remoteAddress;
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new
ConcurrentLinkedQueue<>();
private final MultiVersionProtocolEngine _protocolEngine;
+ private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
private Certificate _certificate;
private int _maxWriteIdle;
@@ -400,7 +416,10 @@ class WebSocketProvider implements Accep
@Override
public void reserveOutboundMessageSpace(final long size)
{
- // TODO
+ if (_usedOutboundMessageSpace.addAndGet(size) >
_outboundMessageBufferLimit)
+ {
+ _protocolEngine.setMessageAssignmentSuspended(true);
+ }
}
void setPeerCertificate(final Certificate certificate)
@@ -434,9 +453,11 @@ class WebSocketProvider implements Accep
try
{
_connection.sendMessage(data, 0, size);
+ _usedOutboundMessageSpace.set(0);
}
catch (IOException e)
{
+ LOGGER.info("Exception on write: {}", e.getMessage());
close();
}
}
@@ -445,17 +466,25 @@ class WebSocketProvider implements Accep
public synchronized void doWork()
{
_protocolEngine.clearWork();
- _protocolEngine.setMessageAssignmentSuspended(true);
-
- Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
- while(iter.hasNext())
+ try
{
- iter.next().run();
- }
+ _protocolEngine.setIOThread(Thread.currentThread());
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ Iterator<Runnable> iter =
_protocolEngine.processPendingIterator();
+ while(iter.hasNext())
+ {
+ iter.next().run();
+ }
- doWrite();
+ doWrite();
- _protocolEngine.setMessageAssignmentSuspended(false);
+ _protocolEngine.setMessageAssignmentSuspended(false);
+ }
+ finally
+ {
+ _protocolEngine.setIOThread(null);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]