Author: jvermillard
Date: Wed Apr 13 13:44:54 2011
New Revision: 1091776
URL: http://svn.apache.org/viewvc?rev=1091776&view=rev
Log:
register for read new session and clear the selection set
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1091776&r1=1091775&r2=1091776&view=diff
==============================================================================
---
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Wed Apr 13 13:44:54 2011
@@ -27,6 +27,7 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +35,6 @@ import java.util.concurrent.ConcurrentLi
import org.apache.mina.IoServer;
import org.apache.mina.IoService;
-import org.apache.mina.IoSession;
import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.service.SelectorStrategy;
import org.apache.mina.transport.tcp.nio.NioTcpServer;
@@ -57,7 +57,7 @@ public class NioSelectorProcessor implem
private Logger log;
- private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new
ConcurrentHashMap<SocketAddress,ServerSocketChannel>();
+ private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new
ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
public NioSelectorProcessor(String name, SelectorStrategy strategy) {
this.strategy = strategy;
@@ -74,10 +74,10 @@ public class NioSelectorProcessor implem
private final Queue<ServerSocketChannel> serversToRemove = new
ConcurrentLinkedQueue<ServerSocketChannel>();
// new session freshly accepted, placed here for being added to the
selector
- private final Queue<IoSession> sessionsToConnect = new
ConcurrentLinkedQueue<IoSession>();
+ private final Queue<NioSocketSession> sessionsToConnect = new
ConcurrentLinkedQueue<NioSocketSession>();
// session to be removed of the selector
- private final Queue<IoSession> sessionsToClose = new
ConcurrentLinkedQueue<IoSession>();
+ private final Queue<NioSocketSession> sessionsToClose = new
ConcurrentLinkedQueue<NioSocketSession>();
/**
* Add a bound server channel for starting accepting new client
connections.
@@ -85,8 +85,8 @@ public class NioSelectorProcessor implem
* @param serverChannel
*/
private void add(ServerSocketChannel serverChannel, IoServer server) {
- log.debug("adding a server channel {} for server {}",
serverChannel,server);
- serversToAdd.add(new Object[]{serverChannel,server});
+ log.debug("adding a server channel {} for server {}", serverChannel,
server);
+ serversToAdd.add(new Object[] { serverChannel, server });
wakeupWorker();
}
@@ -132,7 +132,19 @@ public class NioSelectorProcessor implem
log.debug("create session");
SocketChannel socketChannel = (SocketChannel) clientSocket;
NioSocketSession session = new NioSocketSession((NioTcpServer)
service, socketChannel);
- // TODO : configure & register
+
+ // TODO : configure
+ try {
+ socketChannel.configureBlocking(false);
+ } catch (IOException e) {
+ log.error("Unexpected exception, while configuring socket as non
blocking", e);
+ }
+
+ // TODO : event session created
+
+ // add the session to the queue for being added to the selector
+ sessionsToConnect.add(session);
+ wakeupWorker();
}
/**
@@ -175,28 +187,54 @@ public class NioSelectorProcessor implem
if (serversToAdd.size() > 0) {
while (!serversToAdd.isEmpty()) {
Object[] tmp = serversToAdd.poll();
- ServerSocketChannel channel =
(ServerSocketChannel)tmp[0];
+ ServerSocketChannel channel =
(ServerSocketChannel) tmp[0];
SelectionKey key = channel.register(selector,
SelectionKey.OP_ACCEPT);
key.attach(tmp);
}
}
+ // pop new session for starting read/write
+ if (sessionsToConnect.size() > 0) {
+ while (!sessionsToConnect.isEmpty()) {
+ NioSocketSession session =
sessionsToConnect.poll();
+ session.getSocketChannel().register(selector,
SelectionKey.OP_READ);
+ }
+ }
log.debug("selecting...");
int readyCount = selector.select(SELECT_TIMEOUT);
log.debug("... done selecting : {}", readyCount);
if (readyCount > 0) {
+
// process selected keys
- for (SelectionKey key : selector.selectedKeys()) {
+ Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
+
+ while (selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ if (!key.isValid()) {
+ continue;
+ }
+ selector.selectedKeys().remove(key);
+
+ if (key.isReadable()) {
+ log.debug("readable client {}", key);
+ // TODO
+ }
+
if (key.isAcceptable()) {
- log.debug("acceptable new client");
+ log.debug("acceptable new client {}", key);
+ ServerSocketChannel serverSocket =
(ServerSocketChannel) ((Object[]) key.attachment())[0];
+ IoServer server = (IoServer) (((Object[])
key.attachment())[1]);
// accepted connection
- SocketChannel newClientChannel =
((ServerSocketChannel) ((Object[])key.attachment())[0]).accept();
+ SocketChannel newClientChannel =
serverSocket.accept();
log.debug("client accepted");
// and give it's to the strategy
-
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(
(IoServer)(((Object[])key.attachment())[1]),
+
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
newClientChannel);
}
+
}
}
} catch (IOException e) {
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java?rev=1091776&r1=1091775&r2=1091776&view=diff
==============================================================================
---
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
(original)
+++
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
Wed Apr 13 13:44:54 2011
@@ -40,9 +40,12 @@ public class NioSocketSession extends Ab
NioSocketSession(NioTcpServer service,SocketChannel channel) {
super(service);
-
+ this.channel = channel;
}
+ public SocketChannel getSocketChannel() {
+ return channel;
+ }
/**
* {@inheritDoc}
*/