Author: rgodfrey
Date: Thu Jan 29 19:28:45 2015
New Revision: 1655825
URL: http://svn.apache.org/r1655825
Log:
Correctly deregister and also optimise case where work can be rescheduled on
same thread
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1655825&r1=1655824&r2=1655825&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Thu Jan 29 19:28:45 2015
@@ -165,11 +165,11 @@ public class NonBlockingSenderReceiver
}
_receiver.setTransportBlockedForWriting(!doWrite());
- doRead();
+ boolean dataRead = doRead();
_fullyWritten = doWrite();
_receiver.setTransportBlockedForWriting(!_fullyWritten);
- if(_workDone && _netInputBuffer != null &&
_netInputBuffer.position() != 0)
+ if(dataRead || (_workDone && _netInputBuffer != null &&
_netInputBuffer.position() != 0))
{
_stateChanged.set(true);
}
@@ -202,64 +202,6 @@ public class NonBlockingSenderReceiver
}
-
-/* public void run()
- {
- LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
-
-
- while (!_closed.get())
- {
-
- try
- {
- long currentTime = System.currentTimeMillis();
- int tick = _ticker.getTimeToNextTick(currentTime);
- if(tick <= 0)
- {
- tick = _ticker.tick(currentTime);
- }
-
- _selector.select(tick <= 0 ? 1 : tick);
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- selectionKeys.clear();
-
- _receiver.setTransportBlockedForWriting(!doWrite());
- doRead();
- _fullyWritten = doWrite();
- _receiver.setTransportBlockedForWriting(!_fullyWritten);
-
- _socketChannel.register(_selector,
- _fullyWritten
- ? SelectionKey.OP_READ
- : (SelectionKey.OP_WRITE |
SelectionKey.OP_READ));
-
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing I/O for thread '" +
_remoteSocketAddress + "': " + e);
- close();
- }
- }
-
- try(Selector selector = _selector; SocketChannel channel =
_socketChannel)
- {
- while(!doWrite())
- {
- }
-
- _receiver.closed();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing final write/close for thread '"
+ _remoteSocketAddress + "': " + e);
- }
- finally
- {
- LOGGER.debug("Shutting down IO thread for " +
_remoteSocketAddress);
- }
- }*/
-
@Override
public void flush()
{
@@ -368,9 +310,9 @@ public class NonBlockingSenderReceiver
}
}
- private void doRead() throws IOException
+ private boolean doRead() throws IOException
{
-
+ boolean readData = false;
if(_transportEncryption == TransportEncryption.NONE)
{
int remaining = 0;
@@ -381,6 +323,10 @@ public class NonBlockingSenderReceiver
_currentBuffer = ByteBuffer.allocate(_receiveBufSize);
}
int read = _socketChannel.read(_currentBuffer);
+ if(read > 0)
+ {
+ readData = true;
+ }
if (read == -1)
{
_closed.set(true);
@@ -406,7 +352,10 @@ public class NonBlockingSenderReceiver
{
_closed.set(true);
}
-
+ else if(read > 0)
+ {
+ readData = true;
+ }
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Read " + read + " encrypted bytes ");
@@ -427,6 +376,10 @@ public class NonBlockingSenderReceiver
appInputBuffer.flip();
unwrapped = appInputBuffer.remaining();
+ if(unwrapped > 0)
+ {
+ readData = true;
+ }
_receiver.received(appInputBuffer);
}
while(unwrapped > 0 || tasksRun);
@@ -470,12 +423,13 @@ public class NonBlockingSenderReceiver
{
_onTransportEncryptionAction.run();
_netInputBuffer.compact();
- doRead();
+ readData = doRead();
}
break;
}
}
}
+ return readData;
}
private boolean runSSLEngineTasks(final SSLEngineResult status)
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java?rev=1655825&r1=1655824&r2=1655825&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
Thu Jan 29 19:28:45 2015
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by keith on 28/01/2015.
@@ -66,70 +67,64 @@ public class SelectorThread extends Thre
try
{
- try
+ while (!_closed.get())
{
- while (!_closed.get())
- {
- _selector.select(nextTimeout);
+ _selector.select(nextTimeout);
- List<NonBlockingConnection> toBeScheduled = new
ArrayList<>();
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
- {
- NonBlockingConnection connection =
(NonBlockingConnection) key.attachment();
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ NonBlockingConnection connection = (NonBlockingConnection)
key.attachment();
- key.channel().register(_selector, 0);
+ key.channel().register(_selector, 0);
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
- }
- selectionKeys.clear();
+ }
+ selectionKeys.clear();
- while (_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection =
_unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
+ while (_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection =
_unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
- final int ops = (unregisteredConnection.canRead() ?
SelectionKey.OP_READ : 0)
- |
(unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
-
unregisteredConnection.getSocketChannel().register(_selector, ops,
unregisteredConnection);
+ final int ops = (unregisteredConnection.canRead() ?
SelectionKey.OP_READ : 0)
+ |
(unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+
unregisteredConnection.getSocketChannel().register(_selector, ops,
unregisteredConnection);
- }
+ }
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator =
_unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while (iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator =
_unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
- int period =
connection.getTicker().getTimeToNextTick(currentTime);
- if (period < 0 || connection.isStateChanged())
- {
- toBeScheduled.add(connection);
- iterator.remove();
- }
- else
- {
- nextTimeout = Math.min(period, nextTimeout);
- }
+ int period =
connection.getTicker().getTimeToNextTick(currentTime);
+ if (period < 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ connection.getSocketChannel().register(_selector,
0).cancel();
+ iterator.remove();
}
-
- for (NonBlockingConnection connection : toBeScheduled)
+ else
{
- _scheduler.schedule(connection);
+ nextTimeout = Math.min(period, nextTimeout);
}
+ }
+ for (NonBlockingConnection connection : toBeScheduled)
+ {
+ _scheduler.schedule(connection);
}
- }
- finally
- {
- _selector.close();
+
}
}
catch (IOException e)
@@ -137,6 +132,18 @@ public class SelectorThread extends Thre
//TODO
e.printStackTrace();
}
+ finally
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
@@ -164,26 +171,52 @@ public class SelectorThread extends Thre
private class NetworkConnectionScheduler
{
private final ScheduledThreadPoolExecutor _executor;
+ private final AtomicInteger _running = new AtomicInteger();
+ private final int _poolSize;
private NetworkConnectionScheduler()
{
- _executor = new
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
+ _poolSize = Runtime.getRuntime().availableProcessors();
+ _executor = new ScheduledThreadPoolExecutor(_poolSize);
+ _executor.prestartAllCoreThreads();
}
public void processConnection(final NonBlockingConnection connection)
{
- boolean closed = connection.doWork();
-
- if (!closed)
+ try
{
- if (connection.isStateChanged())
+ _running.incrementAndGet();
+ boolean rerun;
+ do
{
- schedule(connection);
- }
- else
- {
- SelectorThread.this.addConnection(connection);
- }
+ rerun = false;
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+
+ if (connection.isStateChanged())
+ {
+ if (_running.get() == _poolSize)
+ {
+ schedule(connection);
+ }
+ else
+ {
+ rerun = true;
+ }
+ }
+ else
+ {
+ SelectorThread.this.addConnection(connection);
+ }
+ }
+
+ } while (rerun);
+ }
+ finally
+ {
+ _running.decrementAndGet();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]