Author: rgodfrey
Date: Wed Jun 10 16:30:36 2015
New Revision: 1684707

URL: http://svn.apache.org/r1684707
Log:
QPID-6580 : Implement fairness in processing incoming data (work by Rob Godfrey 
and Lorenz Quack)

Added:
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
   (with props)
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 Wed Jun 10 16:30:36 2015
@@ -40,10 +40,10 @@ class NetworkConnectionScheduler
     private final AtomicInteger _running = new AtomicInteger();
     private final int _poolSize;
 
-    NetworkConnectionScheduler(final SelectorThread selectorThread)
+    NetworkConnectionScheduler(final SelectorThread selectorThread, final 
NonBlockingNetworkTransport transport)
     {
         _selectorThread = selectorThread;
-        _poolSize = Runtime.getRuntime().availableProcessors();
+        _poolSize = transport.getThreadPoolSize();
         _executor = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, 
TimeUnit.MILLISECONDS,
                                            new 
LinkedBlockingQueue<Runnable>(), new ThreadFactory()
         {
@@ -96,7 +96,7 @@ class NetworkConnectionScheduler
                 if (!closed)
                 {
 
-                    if (connection.isStateChanged())
+                    if (connection.isStateChanged() || 
connection.isPartialRead())
                     {
                         if (_running.get() == _poolSize)
                         {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 Wed Jun 10 16:30:36 2015
@@ -87,6 +87,7 @@ public class NonBlockingConnection imple
     private boolean _workDone;
     private Certificate _peerCertificate;
 
+    private boolean _partialRead;
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ServerProtocolEngine delegate,
@@ -147,6 +148,10 @@ public class NonBlockingConnection imple
         _remoteSocketAddress = 
_socketChannel.socket().getRemoteSocketAddress().toString();
     }
 
+    public boolean isPartialRead()
+    {
+        return _partialRead;
+    }
 
     Ticker getTicker()
     {
@@ -364,10 +369,10 @@ public class NonBlockingConnection imple
     private boolean doRead() throws IOException
     {
         boolean readData = false;
+        _partialRead = false;
         if(_transportEncryption == TransportEncryption.NONE)
         {
-            int remaining = 0;
-            while (remaining == 0 && !_closed.get())
+            if (!_closed.get())
             {
                 if (_currentBuffer == null || _currentBuffer.remaining() == 0)
                 {
@@ -382,7 +387,9 @@ public class NonBlockingConnection imple
                 {
                     _closed.set(true);
                 }
-                remaining = _currentBuffer.remaining();
+
+                _partialRead = !_currentBuffer.hasRemaining();
+
                 if (LOGGER.isDebugEnabled())
                 {
                     LOGGER.debug("Read " + read + " byte(s)");
@@ -395,10 +402,9 @@ public class NonBlockingConnection imple
         }
         else if(_transportEncryption == TransportEncryption.TLS)
         {
-            int read = 1;
-            while(!_closed.get() && read > 0  && 
_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && 
(_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+            if (!_closed.get() && _sslEngine.getHandshakeStatus() != 
SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || 
_status.getStatus() != SSLEngineResult.Status.CLOSED))
             {
-                read = _socketChannel.read(_netInputBuffer);
+                int read = _socketChannel.read(_netInputBuffer);
                 if (read == -1)
                 {
                     _closed.set(true);
@@ -407,6 +413,9 @@ public class NonBlockingConnection imple
                 {
                     readData = true;
                 }
+
+                _partialRead = !_netInputBuffer.hasRemaining();
+
                 if (LOGGER.isDebugEnabled())
                 {
                     LOGGER.debug("Read " + read + " encrypted bytes ");

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 Wed Jun 10 16:30:36 2015
@@ -200,6 +200,10 @@ public class NonBlockingNetworkTransport
 
                 success = true;
             }
+            else
+            {
+                LOGGER.error("No Engine available.");
+            }
         }
         catch (IOException e)
         {
@@ -221,5 +225,8 @@ public class NonBlockingNetworkTransport
         }
     }
 
-
+    public int getThreadPoolSize()
+    {
+        return _config.getThreadPoolSize();
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 Wed Jun 10 16:30:36 2015
@@ -56,7 +56,7 @@ public class SelectorThread extends Thre
 
     private final Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
-    private final NetworkConnectionScheduler _scheduler = new 
NetworkConnectionScheduler(this);
+    private final NetworkConnectionScheduler _scheduler;
     private final NonBlockingNetworkTransport _transport;
     private long _nextTimeout;
 
@@ -66,6 +66,7 @@ public class SelectorThread extends Thre
 
         _transport = nonBlockingNetworkTransport;
         _selector = Selector.open();
+        _scheduler = new NetworkConnectionScheduler(this, _transport);
     }
 
     public void addAcceptingSocket(final ServerSocketChannel socketChannel)

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
 Wed Jun 10 16:30:36 2015
@@ -160,6 +160,12 @@ class TCPandSSLTransport implements Acce
         }
 
         @Override
+        public int getThreadPoolSize()
+        {
+            return Runtime.getRuntime().availableProcessors();
+        }
+
+        @Override
         public int getReceiveBufferSize()
         {
             return _port.getReceiveBufferSize();

Added: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1684707&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
 (added)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
 Wed Jun 10 16:30:36 2015
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.TransportEncryption;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class NetworkConnectionSchedulerTest extends QpidTestCase
+{
+    private volatile boolean _keepRunningThreads = true;
+
+    public void testFairRead() throws IOException, InterruptedException
+    {
+        NetworkTransportConfiguration config = new 
NetworkTransportConfiguration()
+        {
+
+            @Override
+            public boolean getTcpNoDelay()
+            {
+                return true;
+            }
+
+            @Override
+            public int getReceiveBufferSize()
+            {
+                return 1;
+            }
+
+            @Override
+            public int getSendBufferSize()
+            {
+                return 1;
+            }
+
+            @Override
+            public int getThreadPoolSize()
+            {
+                return 1;
+            }
+
+            @Override
+            public InetSocketAddress getAddress()
+            {
+                return new InetSocketAddress(0);
+            }
+
+            @Override
+            public boolean needClientAuth()
+            {
+                return false;
+            }
+
+            @Override
+            public boolean wantClientAuth()
+            {
+                return false;
+            }
+
+            @Override
+            public Collection<String> getEnabledCipherSuites()
+            {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public Collection<String> getDisabledCipherSuites()
+            {
+                return Collections.emptyList();
+            }
+        };
+        MultiVersionProtocolEngineFactory engineFactory = 
mock(MultiVersionProtocolEngineFactory.class);
+        MultiVersionProtocolEngine verboseEngine = 
mock(MultiVersionProtocolEngine.class);
+        MultiVersionProtocolEngine timidEngine = 
mock(MultiVersionProtocolEngine.class);
+
+        
when(engineFactory.newProtocolEngine(any(SocketAddress.class))).thenReturn(verboseEngine).thenReturn(timidEngine);
+        when(verboseEngine.getAggregateTicker()).thenReturn(new 
AggregateTicker());
+        when(timidEngine.getAggregateTicker()).thenReturn(new 
AggregateTicker());
+
+        NonBlockingNetworkTransport transport = new 
NonBlockingNetworkTransport(config, engineFactory, null, 
EnumSet.of(TransportEncryption.NONE));
+
+        transport.start();
+        final int port = transport.getAcceptingPort();
+
+        Socket verboseSocket = new Socket();
+        verboseSocket.connect(new InetSocketAddress(port));
+        final OutputStream verboseOutputStream = 
verboseSocket.getOutputStream();
+        Thread verboseSender = new Thread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    while (_keepRunningThreads)
+                    {
+                        verboseOutputStream.write("Hello World".getBytes());
+                    }
+                }
+                catch (IOException e)
+                {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        Socket timidSocket = new Socket();
+        timidSocket.connect(new InetSocketAddress(port));
+        final OutputStream timidOutputStream = timidSocket.getOutputStream();
+        Thread timidSender = new Thread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    timidOutputStream.write("me too".getBytes());
+                }
+                catch (IOException e)
+                {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        verboseSender.start();
+        Thread.sleep(500l);
+        timidSender.start();
+        Thread.sleep(1000l);
+        verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class));
+        _keepRunningThreads = false;
+    }
+
+
+}

Propchange: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
 Wed Jun 10 16:30:36 2015
@@ -39,6 +39,8 @@ public interface NetworkTransportConfigu
     // The amount of memory in bytes to allocate to the outgoing buffer
     int getSendBufferSize();
 
+    int getThreadPoolSize();
+
     InetSocketAddress getAddress();
 
     boolean needClientAuth();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to