Author: orudyy
Date: Fri Nov 13 15:51:51 2015
New Revision: 1714224

URL: http://svn.apache.org/viewvc?rev=1714224&view=rev
Log:
QPID-6788: Allow web socket connection to suspend message assignment on 
reaching the outbound message network buffer limit

Modified:
    
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: 
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714224&r1=1714223&r2=1714224&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 Fri Nov 13 15:51:51 2015
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLContext;
 import javax.servlet.ServletException;
@@ -39,7 +40,6 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.eclipse.jetty.server.AbstractConnector;
-import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -49,6 +49,8 @@ import org.eclipse.jetty.util.ssl.SslCon
 import org.eclipse.jetty.util.thread.ThreadPool;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
@@ -67,6 +69,7 @@ import org.apache.qpid.transport.network
 
 class WebSocketProvider implements AcceptingTransport
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketProvider.class);
     public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
     public static final String X509_CERTIFICATES = 
"javax.servlet.request.X509Certificate";
     private final Transport _transport;
@@ -76,6 +79,7 @@ class WebSocketProvider implements Accep
     private final Protocol _defaultSupportedProtocolReply;
     private final MultiVersionProtocolEngineFactory _factory;
     private Server _server;
+    private final long _outboundMessageBufferLimit;
 
     WebSocketProvider(final Transport transport,
                       final SSLContext sslContext,
@@ -88,6 +92,9 @@ class WebSocketProvider implements Accep
         _port = port;
         _supported = supported;
         _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
+
+        _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
+                                                                   
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
         _factory = new MultiVersionProtocolEngineFactory(
                         _port.getParent(Broker.class),
                         _supported,
@@ -236,22 +243,30 @@ class WebSocketProvider implements Accep
             {
 
                 _protocolEngine.clearWork();
-                _protocolEngine.setMessageAssignmentSuspended(true);
-                Iterator<Runnable> iter = 
_protocolEngine.processPendingIterator();
-                while(iter.hasNext())
+                try
                 {
-                    iter.next().run();
-                }
+                    _protocolEngine.setIOThread(Thread.currentThread());
+                    _protocolEngine.setMessageAssignmentSuspended(true);
+                    Iterator<Runnable> iter = 
_protocolEngine.processPendingIterator();
+                    while(iter.hasNext())
+                    {
+                        iter.next().run();
+                    }
 
-                QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
-                buffer.put(data,offset,length);
-                buffer.flip();
-                _protocolEngine.received(buffer);
-                buffer.dispose();
+                    QpidByteBuffer buffer = 
QpidByteBuffer.allocateDirect(length);
+                    buffer.put(data, offset, length);
+                    buffer.flip();
+                    _protocolEngine.received(buffer);
+                    buffer.dispose();
 
-                _connectionWrapper.doWrite();
+                    _connectionWrapper.doWrite();
 
-                _protocolEngine.setMessageAssignmentSuspended(false);
+                    _protocolEngine.setMessageAssignmentSuspended(false);
+                }
+                finally
+                {
+                    _protocolEngine.setIOThread(null);
+                }
             }
         }
 
@@ -300,6 +315,7 @@ class WebSocketProvider implements Accep
         private final SocketAddress _remoteAddress;
         private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new 
ConcurrentLinkedQueue<>();
         private final MultiVersionProtocolEngine _protocolEngine;
+        private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
 
         private Certificate _certificate;
         private int _maxWriteIdle;
@@ -400,7 +416,10 @@ class WebSocketProvider implements Accep
         @Override
         public void reserveOutboundMessageSpace(final long size)
         {
-            // TODO
+            if (_usedOutboundMessageSpace.addAndGet(size) > 
_outboundMessageBufferLimit)
+            {
+                _protocolEngine.setMessageAssignmentSuspended(true);
+            }
         }
 
         void setPeerCertificate(final Certificate certificate)
@@ -434,9 +453,11 @@ class WebSocketProvider implements Accep
                 try
                 {
                     _connection.sendMessage(data, 0, size);
+                    _usedOutboundMessageSpace.set(0);
                 }
                 catch (IOException e)
                 {
+                    LOGGER.info("Exception on write: {}", e.getMessage());
                     close();
                 }
             }
@@ -445,17 +466,25 @@ class WebSocketProvider implements Accep
         public synchronized void doWork()
         {
             _protocolEngine.clearWork();
-            _protocolEngine.setMessageAssignmentSuspended(true);
-
-            Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
-            while(iter.hasNext())
+            try
             {
-                iter.next().run();
-            }
+                _protocolEngine.setIOThread(Thread.currentThread());
+                _protocolEngine.setMessageAssignmentSuspended(true);
+
+                Iterator<Runnable> iter = 
_protocolEngine.processPendingIterator();
+                while(iter.hasNext())
+                {
+                    iter.next().run();
+                }
 
-            doWrite();
+                doWrite();
 
-            _protocolEngine.setMessageAssignmentSuspended(false);
+                _protocolEngine.setMessageAssignmentSuspended(false);
+            }
+            finally
+            {
+                _protocolEngine.setIOThread(null);
+            }
 
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to