Author: elecharny
Date: Tue Dec 13 13:47:57 2011
New Revision: 1213692
URL: http://svn.apache.org/viewvc?rev=1213692&view=rev
Log:
o Extracted the code from the run() method to create two dedicated methods to
handle server addition and removal
o Added some Javadoc and comments
o Added some logs
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1213692&r1=1213691&r2=1213692&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Tue Dec 13 13:47:57 2011
@@ -59,6 +59,9 @@ import org.slf4j.LoggerFactory;
*
*/
public class NioSelectorProcessor implements SelectorProcessor {
+ /** A logger for this class */
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NioSelectorProcessor.class);
+
/**
* A timeout used for the select, as we need to get out to deal with idle
* sessions
@@ -67,31 +70,21 @@ public class NioSelectorProcessor implem
private SelectorStrategy strategy;
- private static final Logger LOGGER =
LoggerFactory.getLogger(NioSelectorProcessor.class);
-
private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new
ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
- /** Read buffer for all the incoming bytes */
+ /** Read buffer for all the incoming bytes (default to 64Kb) */
private ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
- /** Application buffer for all the outgoing messages */
- private ByteBuffer appBuffer = ByteBuffer.allocate(16 * 1024);
-
- // the thread polling and processing the I/O events
+ /** the thread polling and processing the I/O events */
private SelectorWorker worker = null;
- /**
- * new binded server to add to the selector {ServerSocketChannel, IoServer}
- * jvermillard : FIXME the typing is ugly !!!
- */
+ /** A queue containing the servers to bind to this selector */
private final Queue<Object[]> serversToAdd = new
ConcurrentLinkedQueue<Object[]>();
/** server to remove of the selector */
private final Queue<ServerSocketChannel> serversToRemove = new
ConcurrentLinkedQueue<ServerSocketChannel>();
- /**
- * new session freshly accepted, placed here for being added to the
selector
- */
+ /** new session freshly accepted, placed here for being added to the
selector */
private final Queue<NioTcpSession> sessionsToConnect = new
ConcurrentLinkedQueue<NioTcpSession>();
/** session to be removed of the selector */
@@ -292,26 +285,12 @@ public class NioSelectorProcessor implem
try {
// pop server sockets for removing
if (serversToRemove.size() > 0) {
- while (!serversToRemove.isEmpty()) {
- ServerSocketChannel channel =
serversToRemove.poll();
- SelectionKey key = serverKey.remove(channel);
-
- if (key == null) {
- LOGGER.error("The server socket was
already removed of the selector");
- } else {
- key.cancel();
- }
- }
+ processServerRemove();
}
// pop new server sockets for accepting
if (serversToAdd.size() > 0) {
- while (!serversToAdd.isEmpty()) {
- Object[] tmp = serversToAdd.poll();
- ServerSocketChannel channel =
(ServerSocketChannel) tmp[0];
- SelectionKey key = channel.register(selector,
SelectionKey.OP_ACCEPT);
- key.attach(tmp);
- }
+ processServerAdd();
}
// pop new session for starting read/write
@@ -369,6 +348,7 @@ public class NioSelectorProcessor implem
// stop the worker if needed
workerLock.lock();
+
try {
if (selector.keys().isEmpty()) {
worker = null;
@@ -382,6 +362,36 @@ public class NioSelectorProcessor implem
LOGGER.error("Unexpected exception : ", e);
}
}
+
+ /**
+ * Handles the servers removal
+ */
+ private void processServerRemove() {
+ while (!serversToRemove.isEmpty()) {
+ ServerSocketChannel channel = serversToRemove.poll();
+ SelectionKey key = serverKey.remove(channel);
+
+ if (key == null) {
+ LOGGER.error("The server socket was already removed of the
selector");
+ } else {
+ LOGGER.debug("Removing the server from this selector :
{}", key);
+ key.cancel();
+ }
+ }
+ }
+
+ /**
+ * Handles the servers addition
+ */
+ private void processServerAdd() throws IOException {
+ while (!serversToAdd.isEmpty()) {
+ Object[] tmp = serversToAdd.poll();
+ ServerSocketChannel channel = (ServerSocketChannel) tmp[0];
+ SelectionKey key = channel.register(selector,
SelectionKey.OP_ACCEPT);
+ key.attach(tmp);
+ LOGGER.debug("Accepted the server on this selector : {}", key);
+ }
+ }
/**
* Handles all the sessions that must be connected
@@ -459,6 +469,8 @@ public class NioSelectorProcessor implem
readBuffer.flip();
if (session.isSecured()) {
+ // We are reading data over a SSL/TLS encrypted
connection. Redirect
+ // the processing to the SslHelper class.
SslHelper sslHelper =
session.getAttribute(IoSession.SSL_HELPER);
if (sslHelper == null) {
@@ -499,6 +511,8 @@ public class NioSelectorProcessor implem
ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+ // Note that if the connection is secured, the buffer
already
+ // contains encrypted data.
int wrote = session.getSocketChannel().write(buf);
if (LOGGER.isDebugEnabled()) {
@@ -506,8 +520,7 @@ public class NioSelectorProcessor implem
}
if (buf.remaining() == 0) {
- // completed write request, let's remove
- // it
+ // completed write request, let's remove it
queue.remove();
// complete the future
DefaultWriteFuture future = (DefaultWriteFuture)
wreq.getFuture();