Author: rgodfrey
Date: Wed Jun 10 16:30:36 2015
New Revision: 1684707
URL: http://svn.apache.org/r1684707
Log:
QPID-6580 : Implement fairness in processing incoming data (work by Rob Godfrey
and Lorenz Quack)
Added:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
(with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Wed Jun 10 16:30:36 2015
@@ -40,10 +40,10 @@ class NetworkConnectionScheduler
private final AtomicInteger _running = new AtomicInteger();
private final int _poolSize;
- NetworkConnectionScheduler(final SelectorThread selectorThread)
+ NetworkConnectionScheduler(final SelectorThread selectorThread, final
NonBlockingNetworkTransport transport)
{
_selectorThread = selectorThread;
- _poolSize = Runtime.getRuntime().availableProcessors();
+ _poolSize = transport.getThreadPoolSize();
_executor = new ThreadPoolExecutor(_poolSize, _poolSize, 0L,
TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue<Runnable>(), new ThreadFactory()
{
@@ -96,7 +96,7 @@ class NetworkConnectionScheduler
if (!closed)
{
- if (connection.isStateChanged())
+ if (connection.isStateChanged() ||
connection.isPartialRead())
{
if (_running.get() == _poolSize)
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Wed Jun 10 16:30:36 2015
@@ -87,6 +87,7 @@ public class NonBlockingConnection imple
private boolean _workDone;
private Certificate _peerCertificate;
+ private boolean _partialRead;
public NonBlockingConnection(SocketChannel socketChannel,
ServerProtocolEngine delegate,
@@ -147,6 +148,10 @@ public class NonBlockingConnection imple
_remoteSocketAddress =
_socketChannel.socket().getRemoteSocketAddress().toString();
}
+ public boolean isPartialRead()
+ {
+ return _partialRead;
+ }
Ticker getTicker()
{
@@ -364,10 +369,10 @@ public class NonBlockingConnection imple
private boolean doRead() throws IOException
{
boolean readData = false;
+ _partialRead = false;
if(_transportEncryption == TransportEncryption.NONE)
{
- int remaining = 0;
- while (remaining == 0 && !_closed.get())
+ if (!_closed.get())
{
if (_currentBuffer == null || _currentBuffer.remaining() == 0)
{
@@ -382,7 +387,9 @@ public class NonBlockingConnection imple
{
_closed.set(true);
}
- remaining = _currentBuffer.remaining();
+
+ _partialRead = !_currentBuffer.hasRemaining();
+
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Read " + read + " byte(s)");
@@ -395,10 +402,9 @@ public class NonBlockingConnection imple
}
else if(_transportEncryption == TransportEncryption.TLS)
{
- int read = 1;
- while(!_closed.get() && read > 0 &&
_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP &&
(_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+ if (!_closed.get() && _sslEngine.getHandshakeStatus() !=
SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null ||
_status.getStatus() != SSLEngineResult.Status.CLOSED))
{
- read = _socketChannel.read(_netInputBuffer);
+ int read = _socketChannel.read(_netInputBuffer);
if (read == -1)
{
_closed.set(true);
@@ -407,6 +413,9 @@ public class NonBlockingConnection imple
{
readData = true;
}
+
+ _partialRead = !_netInputBuffer.hasRemaining();
+
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Read " + read + " encrypted bytes ");
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Wed Jun 10 16:30:36 2015
@@ -200,6 +200,10 @@ public class NonBlockingNetworkTransport
success = true;
}
+ else
+ {
+ LOGGER.error("No Engine available.");
+ }
}
catch (IOException e)
{
@@ -221,5 +225,8 @@ public class NonBlockingNetworkTransport
}
}
-
+ public int getThreadPoolSize()
+ {
+ return _config.getThreadPoolSize();
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Wed Jun 10 16:30:36 2015
@@ -56,7 +56,7 @@ public class SelectorThread extends Thre
private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
- private final NetworkConnectionScheduler _scheduler = new
NetworkConnectionScheduler(this);
+ private final NetworkConnectionScheduler _scheduler;
private final NonBlockingNetworkTransport _transport;
private long _nextTimeout;
@@ -66,6 +66,7 @@ public class SelectorThread extends Thre
_transport = nonBlockingNetworkTransport;
_selector = Selector.open();
+ _scheduler = new NetworkConnectionScheduler(this, _transport);
}
public void addAcceptingSocket(final ServerSocketChannel socketChannel)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
Wed Jun 10 16:30:36 2015
@@ -160,6 +160,12 @@ class TCPandSSLTransport implements Acce
}
@Override
+ public int getThreadPoolSize()
+ {
+ return Runtime.getRuntime().availableProcessors();
+ }
+
+ @Override
public int getReceiveBufferSize()
{
return _port.getReceiveBufferSize();
Added:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1684707&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
(added)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
Wed Jun 10 16:30:36 2015
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.TransportEncryption;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class NetworkConnectionSchedulerTest extends QpidTestCase
+{
+ private volatile boolean _keepRunningThreads = true;
+
+ public void testFairRead() throws IOException, InterruptedException
+ {
+ NetworkTransportConfiguration config = new
NetworkTransportConfiguration()
+ {
+
+ @Override
+ public boolean getTcpNoDelay()
+ {
+ return true;
+ }
+
+ @Override
+ public int getReceiveBufferSize()
+ {
+ return 1;
+ }
+
+ @Override
+ public int getSendBufferSize()
+ {
+ return 1;
+ }
+
+ @Override
+ public int getThreadPoolSize()
+ {
+ return 1;
+ }
+
+ @Override
+ public InetSocketAddress getAddress()
+ {
+ return new InetSocketAddress(0);
+ }
+
+ @Override
+ public boolean needClientAuth()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean wantClientAuth()
+ {
+ return false;
+ }
+
+ @Override
+ public Collection<String> getEnabledCipherSuites()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<String> getDisabledCipherSuites()
+ {
+ return Collections.emptyList();
+ }
+ };
+ MultiVersionProtocolEngineFactory engineFactory =
mock(MultiVersionProtocolEngineFactory.class);
+ MultiVersionProtocolEngine verboseEngine =
mock(MultiVersionProtocolEngine.class);
+ MultiVersionProtocolEngine timidEngine =
mock(MultiVersionProtocolEngine.class);
+
+
when(engineFactory.newProtocolEngine(any(SocketAddress.class))).thenReturn(verboseEngine).thenReturn(timidEngine);
+ when(verboseEngine.getAggregateTicker()).thenReturn(new
AggregateTicker());
+ when(timidEngine.getAggregateTicker()).thenReturn(new
AggregateTicker());
+
+ NonBlockingNetworkTransport transport = new
NonBlockingNetworkTransport(config, engineFactory, null,
EnumSet.of(TransportEncryption.NONE));
+
+ transport.start();
+ final int port = transport.getAcceptingPort();
+
+ Socket verboseSocket = new Socket();
+ verboseSocket.connect(new InetSocketAddress(port));
+ final OutputStream verboseOutputStream =
verboseSocket.getOutputStream();
+ Thread verboseSender = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (_keepRunningThreads)
+ {
+ verboseOutputStream.write("Hello World".getBytes());
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Socket timidSocket = new Socket();
+ timidSocket.connect(new InetSocketAddress(port));
+ final OutputStream timidOutputStream = timidSocket.getOutputStream();
+ Thread timidSender = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ timidOutputStream.write("me too".getBytes());
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ verboseSender.start();
+ Thread.sleep(500l);
+ timidSender.start();
+ Thread.sleep(1000l);
+ verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class));
+ _keepRunningThreads = false;
+ }
+
+
+}
Propchange:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java?rev=1684707&r1=1684706&r2=1684707&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
Wed Jun 10 16:30:36 2015
@@ -39,6 +39,8 @@ public interface NetworkTransportConfigu
// The amount of memory in bytes to allocate to the outgoing buffer
int getSendBufferSize();
+ int getThreadPoolSize();
+
InetSocketAddress getAddress();
boolean needClientAuth();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]