Author: jvermillard
Date: Wed Apr 13 13:44:54 2011
New Revision: 1091776

URL: http://svn.apache.org/viewvc?rev=1091776&view=rev
Log:
register for read new session and clear the selection set

Modified:
    
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
    
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java

Modified: 
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1091776&r1=1091775&r2=1091776&view=diff
==============================================================================
--- 
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
 (original)
+++ 
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
 Wed Apr 13 13:44:54 2011
@@ -27,6 +27,7 @@ import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +35,6 @@ import java.util.concurrent.ConcurrentLi
 
 import org.apache.mina.IoServer;
 import org.apache.mina.IoService;
-import org.apache.mina.IoSession;
 import org.apache.mina.service.SelectorProcessor;
 import org.apache.mina.service.SelectorStrategy;
 import org.apache.mina.transport.tcp.nio.NioTcpServer;
@@ -57,7 +57,7 @@ public class NioSelectorProcessor implem
 
     private Logger log;
 
-    private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new 
ConcurrentHashMap<SocketAddress,ServerSocketChannel>();
+    private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new 
ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
 
     public NioSelectorProcessor(String name, SelectorStrategy strategy) {
         this.strategy = strategy;
@@ -74,10 +74,10 @@ public class NioSelectorProcessor implem
     private final Queue<ServerSocketChannel> serversToRemove = new 
ConcurrentLinkedQueue<ServerSocketChannel>();
 
     // new session freshly accepted, placed here for being added to the 
selector
-    private final Queue<IoSession> sessionsToConnect = new 
ConcurrentLinkedQueue<IoSession>();
+    private final Queue<NioSocketSession> sessionsToConnect = new 
ConcurrentLinkedQueue<NioSocketSession>();
 
     // session to be removed of the selector
-    private final Queue<IoSession> sessionsToClose = new 
ConcurrentLinkedQueue<IoSession>();
+    private final Queue<NioSocketSession> sessionsToClose = new 
ConcurrentLinkedQueue<NioSocketSession>();
 
     /**
      * Add a bound server channel for starting accepting new client 
connections.
@@ -85,8 +85,8 @@ public class NioSelectorProcessor implem
      * @param serverChannel
      */
     private void add(ServerSocketChannel serverChannel, IoServer server) {
-        log.debug("adding a server channel {} for server {}", 
serverChannel,server);
-        serversToAdd.add(new Object[]{serverChannel,server});
+        log.debug("adding a server channel {} for server {}", serverChannel, 
server);
+        serversToAdd.add(new Object[] { serverChannel, server });
         wakeupWorker();
     }
 
@@ -132,7 +132,19 @@ public class NioSelectorProcessor implem
         log.debug("create session");
         SocketChannel socketChannel = (SocketChannel) clientSocket;
         NioSocketSession session = new NioSocketSession((NioTcpServer) 
service, socketChannel);
-        // TODO : configure & register
+
+        // TODO : configure
+        try {
+            socketChannel.configureBlocking(false);
+        } catch (IOException e) {
+            log.error("Unexpected exception, while configuring socket as non 
blocking", e);
+        }
+
+        // TODO : event session created
+
+        // add the session to the queue for being added to the selector
+        sessionsToConnect.add(session);
+        wakeupWorker();
     }
 
     /**
@@ -175,28 +187,54 @@ public class NioSelectorProcessor implem
                     if (serversToAdd.size() > 0) {
                         while (!serversToAdd.isEmpty()) {
                             Object[] tmp = serversToAdd.poll();
-                            ServerSocketChannel channel = 
(ServerSocketChannel)tmp[0];
+                            ServerSocketChannel channel = 
(ServerSocketChannel) tmp[0];
                             SelectionKey key = channel.register(selector, 
SelectionKey.OP_ACCEPT);
                             key.attach(tmp);
                         }
                     }
 
+                    // pop new session for starting read/write
+                    if (sessionsToConnect.size() > 0) {
+                        while (!sessionsToConnect.isEmpty()) {
+                            NioSocketSession session = 
sessionsToConnect.poll();
+                            session.getSocketChannel().register(selector, 
SelectionKey.OP_READ);
+                        }
+                    }
                     log.debug("selecting...");
                     int readyCount = selector.select(SELECT_TIMEOUT);
                     log.debug("... done selecting : {}", readyCount);
 
                     if (readyCount > 0) {
+
                         // process selected keys
-                        for (SelectionKey key : selector.selectedKeys()) {
+                        Iterator<SelectionKey> selectedKeys = 
selector.selectedKeys().iterator();
+
+                        while (selectedKeys.hasNext()) {
+                            SelectionKey key = selectedKeys.next();
+                            selectedKeys.remove();
+
+                            if (!key.isValid()) {
+                                continue;
+                            }
+                            selector.selectedKeys().remove(key);
+
+                            if (key.isReadable()) {
+                                log.debug("readable client {}", key);
+                                // TODO
+                            }
+
                             if (key.isAcceptable()) {
-                                log.debug("acceptable new client");
+                                log.debug("acceptable new client {}", key);
+                                ServerSocketChannel serverSocket = 
(ServerSocketChannel) ((Object[]) key.attachment())[0];
+                                IoServer server = (IoServer) (((Object[]) 
key.attachment())[1]);
                                 // accepted connection
-                                SocketChannel newClientChannel = 
((ServerSocketChannel) ((Object[])key.attachment())[0]).accept();
+                                SocketChannel newClientChannel = 
serverSocket.accept();
                                 log.debug("client accepted");
                                 // and give it's to the strategy
-                                
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession( 
(IoServer)(((Object[])key.attachment())[1]),
+                                
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
                                         newClientChannel);
                             }
+
                         }
                     }
                 } catch (IOException e) {

Modified: 
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
URL: 
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java?rev=1091776&r1=1091775&r2=1091776&view=diff
==============================================================================
--- 
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
 (original)
+++ 
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSocketSession.java
 Wed Apr 13 13:44:54 2011
@@ -40,9 +40,12 @@ public class NioSocketSession extends Ab
     
     NioSocketSession(NioTcpServer service,SocketChannel channel) {
         super(service);
-        
+        this.channel = channel;
     }
 
+    public SocketChannel getSocketChannel() {
+        return channel;
+    }
     /**
      * {@inheritDoc}
      */


Reply via email to