Author: jvermillard
Date: Sun Dec 11 12:53:04 2011
New Revision: 1212998

URL: http://svn.apache.org/viewvc?rev=1212998&view=rev
Log:
DIRMINA-870 closing session when the write queue is empty

Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
    
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1212998&r1=1212997&r2=1212998&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
 Sun Dec 11 12:53:04 2011
@@ -38,9 +38,9 @@ import java.util.concurrent.locks.Reentr
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 
 import org.apache.mina.api.IoServer;
 import org.apache.mina.api.IoService;
@@ -82,7 +82,7 @@ public class NioSelectorProcessor implem
 
     /** Application buffer for all the outgoing messages */
     private ByteBuffer appBuffer = ByteBuffer.allocate(16 * 1024);
-    
+
     /** An empty buffer used during the handshake phase */
     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
@@ -206,56 +206,56 @@ public class NioSelectorProcessor implem
 
         // apply the default service socket configuration
         Boolean keepAlive = defaultConfig.isKeepAlive();
-        
+
         if (keepAlive != null) {
             session.getConfig().setKeepAlive(keepAlive);
         }
 
         Boolean oobInline = defaultConfig.isOobInline();
-        
+
         if (oobInline != null) {
             session.getConfig().setOobInline(oobInline);
         }
 
         Boolean reuseAddress = defaultConfig.isReuseAddress();
-        
+
         if (reuseAddress != null) {
             session.getConfig().setReuseAddress(reuseAddress);
         }
 
         Boolean tcpNoDelay = defaultConfig.isTcpNoDelay();
-        
+
         if (tcpNoDelay != null) {
             session.getConfig().setTcpNoDelay(tcpNoDelay);
         }
 
         Integer receiveBufferSize = defaultConfig.getReceiveBufferSize();
-        
+
         if (receiveBufferSize != null) {
             session.getConfig().setReceiveBufferSize(receiveBufferSize);
         }
 
         Integer sendBufferSize = defaultConfig.getSendBufferSize();
-        
+
         if (sendBufferSize != null) {
             session.getConfig().setSendBufferSize(sendBufferSize);
         }
 
         Integer trafficClass = defaultConfig.getTrafficClass();
-        
+
         if (trafficClass != null) {
             session.getConfig().setTrafficClass(trafficClass);
         }
 
         Integer soLinger = defaultConfig.getSoLinger();
-        
+
         if (soLinger != null) {
             session.getConfig().setSoLinger(soLinger);
         }
-        
+
         // Set the secured flag if the service is to be used over SSL/TLS
         if (service.isSecured()) {
-            session.initSecure( service.getSslContext() );
+            session.initSecure(service.getSslContext());
         }
 
         // event session created
@@ -288,112 +288,117 @@ public class NioSelectorProcessor implem
 
         // map for finding read keys associated with a given session
         private Map<NioTcpSession, SelectionKey> sessionReadKey = new 
HashMap<NioTcpSession, SelectionKey>();
-        
+
         private boolean handshaking = false;
 
         @Override
         public void run() {
-            if (selector == null) {
-                LOGGER.debug("opening a new selector");
+            try {
+                if (selector == null) {
+                    LOGGER.debug("opening a new selector");
 
-                try {
-                    selector = Selector.open();
-                } catch (IOException e) {
-                    LOGGER.error("IOException while opening a new Selector", 
e);
+                    try {
+                        selector = Selector.open();
+                    } catch (IOException e) {
+                        LOGGER.error("IOException while opening a new 
Selector", e);
+                    }
                 }
-            }
 
-            for (;;) {
-                try {
-                    // pop server sockets for removing
-                    if (serversToRemove.size() > 0) {
-                        while (!serversToRemove.isEmpty()) {
-                            ServerSocketChannel channel = 
serversToRemove.poll();
-                            SelectionKey key = serverKey.remove(channel);
-
-                            if (key == null) {
-                                LOGGER.error("The server socket was already 
removed of the selector");
-                            } else {
-                                key.cancel();
+                for (;;) {
+                    try {
+                        // pop server sockets for removing
+                        if (serversToRemove.size() > 0) {
+                            while (!serversToRemove.isEmpty()) {
+                                ServerSocketChannel channel = 
serversToRemove.poll();
+                                SelectionKey key = serverKey.remove(channel);
+
+                                if (key == null) {
+                                    LOGGER.error("The server socket was 
already removed of the selector");
+                                } else {
+                                    key.cancel();
+                                }
                             }
                         }
-                    }
-
-                    // pop new server sockets for accepting
-                    if (serversToAdd.size() > 0) {
-                        while (!serversToAdd.isEmpty()) {
-                            Object[] tmp = serversToAdd.poll();
-                            ServerSocketChannel channel = 
(ServerSocketChannel) tmp[0];
-                            SelectionKey key = channel.register(selector, 
SelectionKey.OP_ACCEPT);
-                            key.attach(tmp);
-                        }
-                    }
 
-                    // pop new session for starting read/write
-                    if (sessionsToConnect.size() > 0) {
-                        processConnectSessions();
-                    }
-
-                    // pop session for close, if any
-                    if (sessionsToClose.size() > 0) {
-                        processCloseSessions();
-                    }
-
-                    LOGGER.debug("selecting...");
-                    int readyCount = selector.select(SELECT_TIMEOUT);
-                    LOGGER.debug("... done selecting : {}", readyCount);
-
-                    if (readyCount > 0) {
-                        // process selected keys
-                        Iterator<SelectionKey> selectedKeys = 
selector.selectedKeys().iterator();
-
-                        // Loop on each SelectionKey and process any valid 
action
-                        while (selectedKeys.hasNext()) {
-                            SelectionKey key = selectedKeys.next();
-                            selectedKeys.remove();
-
-                            if (!key.isValid()) {
-                                continue;
+                        // pop new server sockets for accepting
+                        if (serversToAdd.size() > 0) {
+                            while (!serversToAdd.isEmpty()) {
+                                Object[] tmp = serversToAdd.poll();
+                                ServerSocketChannel channel = 
(ServerSocketChannel) tmp[0];
+                                SelectionKey key = channel.register(selector, 
SelectionKey.OP_ACCEPT);
+                                key.attach(tmp);
                             }
+                        }
 
-                            selector.selectedKeys().remove(key);
+                        // pop new session for starting read/write
+                        if (sessionsToConnect.size() > 0) {
+                            processConnectSessions();
+                        }
 
-                            if (key.isReadable()) {
-                                processRead(key);
-                            }
+                        // pop session for close, if any
+                        if (sessionsToClose.size() > 0) {
+                            processCloseSessions();
+                        }
 
-                            if (key.isWritable()) {
-                                processWrite(key);
-                            }
+                        LOGGER.debug("selecting...");
+                        int readyCount = selector.select(SELECT_TIMEOUT);
+                        LOGGER.debug("... done selecting : {}", readyCount);
+
+                        if (readyCount > 0) {
+                            // process selected keys
+                            Iterator<SelectionKey> selectedKeys = 
selector.selectedKeys().iterator();
+
+                            // Loop on each SelectionKey and process any valid 
action
+                            while (selectedKeys.hasNext()) {
+                                SelectionKey key = selectedKeys.next();
+                                selectedKeys.remove();
+
+                                if (!key.isValid()) {
+                                    continue;
+                                }
+
+                                selector.selectedKeys().remove(key);
+
+                                if (key.isAcceptable()) {
+                                    processAccept(key);
+                                }
+
+                                if (key.isReadable()) {
+                                    processRead(key);
+                                }
+
+                                if (key.isWritable()) {
+                                    processWrite(key);
+                                }
 
-                            if (key.isAcceptable()) {
-                                processAccept(key);
                             }
                         }
-                    }
 
-                    // registering session with data in the write queue for
-                    // writing
-                    while (!flushingSessions.isEmpty()) {
-                        processFushSessions();
+                        // registering session with data in the write queue for
+                        // writing
+                        while (!flushingSessions.isEmpty()) {
+                            processFushSessions();
+                        }
+                    } catch (IOException e) {
+                        LOGGER.error("IOException while selecting selector", 
e);
                     }
-                } catch (IOException e) {
-                    LOGGER.error("IOException while selecting selector", e);
-                }
 
-                // stop the worker if needed
-                workerLock.lock();
-                try {
-                    if (selector.keys().isEmpty()) {
-                        worker = null;
-                        break;
+                    // stop the worker if needed
+                    workerLock.lock();
+                    try {
+                        if (selector.keys().isEmpty()) {
+                            worker = null;
+                            break;
+                        }
+                    } finally {
+                        workerLock.unlock();
                     }
-                } finally {
-                    workerLock.unlock();
                 }
+            } catch (Exception e) {
+                LOGGER.error("Unexpected exception : ", e);
             }
         }
-        
+
         /**
          * Handles all the sessions that must be connected
          */
@@ -408,14 +413,14 @@ public class NioSelectorProcessor implem
                 // will occur later.
                 if (!session.isSecured()) {
                     session.setConnected();
-                    
+
                     // fire the event
                     ((AbstractIoService) 
session.getService()).fireSessionCreated(session);
                     session.getFilterChain().processSessionOpened(session);
                 }
             }
         }
-        
+
         /**
          * Handles all the sessions that must be closed
          */
@@ -433,7 +438,7 @@ public class NioSelectorProcessor implem
                 ((AbstractIoService) 
session.getService()).fireSessionDestroyed(session);
             }
         }
-        
+
         /**
          * Processes the Accept action for the given SelectionKey
          */
@@ -445,14 +450,13 @@ public class NioSelectorProcessor implem
             SocketChannel newClientChannel = serverSocket.accept();
             LOGGER.debug("client accepted");
             // and give it's to the strategy
-            
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
-                    newClientChannel);
+            
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
 newClientChannel);
         }
-        
+
         /**
          * Processes the Read action for the given SelectionKey
          */
-        private void processRead(SelectionKey key) throws IOException{
+        private void processRead(SelectionKey key) throws IOException {
             LOGGER.debug("readable client {}", key);
             NioTcpSession session = (NioTcpSession) key.attachment();
             SocketChannel channel = session.getSocketChannel();
@@ -469,7 +473,7 @@ public class NioSelectorProcessor implem
                 // we have read some data
                 // limit at the current position & rewind buffer back to start 
& push to the chain
                 readBuffer.flip();
-                
+
                 if (session.isSecured() && !session.isConnectedSecured()) {
                     // Process the SSL handshake now
                     processHandShake(session, readBuffer);
@@ -478,120 +482,118 @@ public class NioSelectorProcessor implem
                 }
             }
         }
-        
+
         private boolean processHandShake(IoSession session, ByteBuffer 
inBuffer) throws SSLException {
-            SslHelper sslHelper = session.getAttribute( IoSession.SSL_HELPER );
-            
+            SslHelper sslHelper = session.getAttribute(IoSession.SSL_HELPER);
+
             if (sslHelper == null) {
                 throw new IllegalStateException();
             }
-            
+
             SSLEngine engine = sslHelper.getEngine();
             HandshakeStatus hsStatus = engine.getHandshakeStatus();
             boolean processingData = true;
-            
+
             // Start the Handshake if we aren't already processing a HandShake
             // and switch to the SECURING state
             if (!handshaking) {
                 engine.beginHandshake();
                 handshaking = true;
-                session.changeState( SessionState.SECURING );
+                session.changeState(SessionState.SECURING);
             }
-            
+
             hsStatus = engine.getHandshakeStatus();
 
             // If the SSLEngine has not be started, then the status will be 
NOT_HANDSHAKING
-            while ((hsStatus != HandshakeStatus.FINISHED) &&
-                   (hsStatus != HandshakeStatus.NOT_HANDSHAKING ) &&
-                   processingData) {
+            while ((hsStatus != HandshakeStatus.FINISHED) && (hsStatus != 
HandshakeStatus.NOT_HANDSHAKING)
+                    && processingData) {
                 switch (hsStatus) {
-                    case NEED_TASK :
-                        hsStatus = sslHelper.processTasks(engine);
-                        
-                        break;
-                        
-                    case NEED_WRAP :
-                        if ( LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("{} processing the NEED_WRAP state", 
session);
-                        }
-                        
-                        int capacity = 
engine.getSession().getPacketBufferSize();
-                        ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
-                        SSLEngineResult result = null;
-
-                        while (true) {
-                            result = engine.wrap(EMPTY_BUFFER, outBuffer);
-                            
-                            if (result.getStatus() == 
SSLEngineResult.Status.BUFFER_OVERFLOW) {
-                                // TODO : increase the AppBuffer size
-                            } else {
-                                break;
-                            }
-                        }
-        
-                        outBuffer.flip();
-                        session.write(outBuffer);
-                        hsStatus = result.getHandshakeStatus();
-
-                        // We continue to loop while we don't expect messages 
to unwrap,
-                        // otherwise, we have to exit the loop.
-                        processingData = (hsStatus != 
HandshakeStatus.NEED_UNWRAP);
+                case NEED_TASK:
+                    hsStatus = sslHelper.processTasks(engine);
 
-                        break;
-                        
-                    case NEED_UNWRAP :
-                        Status status = sslHelper.processUnwrap(engine, 
inBuffer, EMPTY_BUFFER);
-
-                        if ( status == Status.BUFFER_UNDERFLOW) {
-                            // Read more data
-                            processingData = false;
+                    break;
+
+                case NEED_WRAP:
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("{} processing the NEED_WRAP state", 
session);
+                    }
+
+                    int capacity = engine.getSession().getPacketBufferSize();
+                    ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
+                    SSLEngineResult result = null;
+
+                    while (true) {
+                        result = engine.wrap(EMPTY_BUFFER, outBuffer);
+
+                        if (result.getStatus() == 
SSLEngineResult.Status.BUFFER_OVERFLOW) {
+                            // TODO : increase the AppBuffer size
                         } else {
-                            hsStatus = engine.getHandshakeStatus();
+                            break;
                         }
-                        
-                        break;
+                    }
+
+                    outBuffer.flip();
+                    session.write(outBuffer);
+                    hsStatus = result.getHandshakeStatus();
+
+                    // We continue to loop while we don't expect messages to 
unwrap,
+                    // otherwise, we have to exit the loop.
+                    processingData = (hsStatus != HandshakeStatus.NEED_UNWRAP);
+
+                    break;
+
+                case NEED_UNWRAP:
+                    Status status = sslHelper.processUnwrap(engine, inBuffer, 
EMPTY_BUFFER);
+
+                    if (status == Status.BUFFER_UNDERFLOW) {
+                        // Read more data
+                        processingData = false;
+                    } else {
+                        hsStatus = engine.getHandshakeStatus();
+                    }
+
+                    break;
                 }
             }
-            
+
             if (hsStatus == HandshakeStatus.FINISHED) {
-                if ( LOGGER.isDebugEnabled()) {
+                if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} processing the FINISHED state", session);
                 }
-                
+
                 session.changeState(SessionState.SECURED);
                 handshaking = false;
 
                 return true;
             }
-            
+
             return false;
         }
-        
+
         /**
          * Processes the Write action for the given SelectionKey
          */
         private void processWrite(SelectionKey key) throws IOException {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("writable session : {}", key.attachment());
-            }
-            
             NioTcpSession session = (NioTcpSession) key.attachment();
+
+            LOGGER.debug("writable session : {}", session);
+
             session.setNotRegisteredForWrite();
 
             // write from the session write queue
             boolean isEmpty = false;
-            
+
             try {
                 Queue<WriteRequest> queue = session.acquireWriteQueue();
 
                 do {
                     // get a write request from the queue
                     WriteRequest wreq = queue.peek();
-                    
+
                     if (wreq == null) {
                         break;
                     }
-                    
+
                     ByteBuffer buf = (ByteBuffer) wreq.getMessage();
 
                     int wrote = session.getSocketChannel().write(buf);
@@ -606,7 +608,7 @@ public class NioSelectorProcessor implem
                         queue.remove();
                         // complete the future
                         DefaultWriteFuture future = (DefaultWriteFuture) 
wreq.getFuture();
-                        
+
                         if (future != null) {
                             future.complete();
                         }
@@ -617,7 +619,7 @@ public class NioSelectorProcessor implem
                         break;
                     }
                 } while (!queue.isEmpty());
-                
+
                 isEmpty = queue.isEmpty();
             } finally {
                 session.releaseWriteQueue();
@@ -626,22 +628,28 @@ public class NioSelectorProcessor implem
             // if the session is no more interested in writing, we need
             // to stop listening for OP_WRITE events
             if (isEmpty) {
-                // a key registered for read ? (because we can have a
-                // Selector for reads and another for the writes
-                SelectionKey readKey = sessionReadKey.get(session);
-                
-                if (readKey != null) {
-                    LOGGER.debug("registering key for only reading");
-                    SelectionKey mykey = 
session.getSocketChannel().register(selector,
-                            SelectionKey.OP_READ, session);
-                    sessionReadKey.put(session, mykey);
+                if (session.isClosing()) {
+                    LOGGER.debug("closing session {} have empty write queue, 
so we close it", session);
+                    // we was flushing writes, now we to the close
+                    session.getSocketChannel().close();
                 } else {
-                    LOGGER.debug("cancel key for writing");
-                    session.getSocketChannel().keyFor(selector).cancel();
+                    // a key registered for read ? (because we can have a
+                    // Selector for reads and another for the writes
+                    SelectionKey readKey = sessionReadKey.get(session);
+
+                    if (readKey != null) {
+                        LOGGER.debug("registering key for only reading");
+                        SelectionKey mykey = 
session.getSocketChannel().register(selector, SelectionKey.OP_READ,
+                                session);
+                        sessionReadKey.put(session, mykey);
+                    } else {
+                        LOGGER.debug("cancel key for writing");
+                        session.getSocketChannel().keyFor(selector).cancel();
+                    }
                 }
             }
         }
-        
+
         /**
          * Flushes the sessions
          */
@@ -650,7 +658,7 @@ public class NioSelectorProcessor implem
             // a key registered for read ? (because we can have a
             // Selector for reads and another for the writes
             SelectionKey readKey = sessionReadKey.get(session);
-            
+
             if (readKey != null) {
                 // register for read/write
                 SelectionKey key = 
session.getSocketChannel().register(selector,

Modified: 
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1212998&r1=1212997&r2=1212998&view=diff
==============================================================================
--- 
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java 
(original)
+++ 
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java 
Sun Dec 11 12:53:04 2011
@@ -107,6 +107,7 @@ public class HttpTest {
             session.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpStatus.SUCCESS_OK, headers));
             session.write(content);
             session.write(new HttpEndOfContent());
+            session.close(false);
 
         }
     }


Reply via email to