Author: jvermillard
Date: Sun Dec 11 12:53:04 2011
New Revision: 1212998
URL: http://svn.apache.org/viewvc?rev=1212998&view=rev
Log:
DIRMINA-870 closing session when the write queue is empty
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1212998&r1=1212997&r2=1212998&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Sun Dec 11 12:53:04 2011
@@ -38,9 +38,9 @@ import java.util.concurrent.locks.Reentr
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
@@ -82,7 +82,7 @@ public class NioSelectorProcessor implem
/** Application buffer for all the outgoing messages */
private ByteBuffer appBuffer = ByteBuffer.allocate(16 * 1024);
-
+
/** An empty buffer used during the handshake phase */
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
@@ -206,56 +206,56 @@ public class NioSelectorProcessor implem
// apply the default service socket configuration
Boolean keepAlive = defaultConfig.isKeepAlive();
-
+
if (keepAlive != null) {
session.getConfig().setKeepAlive(keepAlive);
}
Boolean oobInline = defaultConfig.isOobInline();
-
+
if (oobInline != null) {
session.getConfig().setOobInline(oobInline);
}
Boolean reuseAddress = defaultConfig.isReuseAddress();
-
+
if (reuseAddress != null) {
session.getConfig().setReuseAddress(reuseAddress);
}
Boolean tcpNoDelay = defaultConfig.isTcpNoDelay();
-
+
if (tcpNoDelay != null) {
session.getConfig().setTcpNoDelay(tcpNoDelay);
}
Integer receiveBufferSize = defaultConfig.getReceiveBufferSize();
-
+
if (receiveBufferSize != null) {
session.getConfig().setReceiveBufferSize(receiveBufferSize);
}
Integer sendBufferSize = defaultConfig.getSendBufferSize();
-
+
if (sendBufferSize != null) {
session.getConfig().setSendBufferSize(sendBufferSize);
}
Integer trafficClass = defaultConfig.getTrafficClass();
-
+
if (trafficClass != null) {
session.getConfig().setTrafficClass(trafficClass);
}
Integer soLinger = defaultConfig.getSoLinger();
-
+
if (soLinger != null) {
session.getConfig().setSoLinger(soLinger);
}
-
+
// Set the secured flag if the service is to be used over SSL/TLS
if (service.isSecured()) {
- session.initSecure( service.getSslContext() );
+ session.initSecure(service.getSslContext());
}
// event session created
@@ -288,112 +288,117 @@ public class NioSelectorProcessor implem
// map for finding read keys associated with a given session
private Map<NioTcpSession, SelectionKey> sessionReadKey = new
HashMap<NioTcpSession, SelectionKey>();
-
+
private boolean handshaking = false;
@Override
public void run() {
- if (selector == null) {
- LOGGER.debug("opening a new selector");
+ try {
+ if (selector == null) {
+ LOGGER.debug("opening a new selector");
- try {
- selector = Selector.open();
- } catch (IOException e) {
- LOGGER.error("IOException while opening a new Selector",
e);
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ LOGGER.error("IOException while opening a new
Selector", e);
+ }
}
- }
- for (;;) {
- try {
- // pop server sockets for removing
- if (serversToRemove.size() > 0) {
- while (!serversToRemove.isEmpty()) {
- ServerSocketChannel channel =
serversToRemove.poll();
- SelectionKey key = serverKey.remove(channel);
-
- if (key == null) {
- LOGGER.error("The server socket was already
removed of the selector");
- } else {
- key.cancel();
+ for (;;) {
+ try {
+ // pop server sockets for removing
+ if (serversToRemove.size() > 0) {
+ while (!serversToRemove.isEmpty()) {
+ ServerSocketChannel channel =
serversToRemove.poll();
+ SelectionKey key = serverKey.remove(channel);
+
+ if (key == null) {
+ LOGGER.error("The server socket was
already removed of the selector");
+ } else {
+ key.cancel();
+ }
}
}
- }
-
- // pop new server sockets for accepting
- if (serversToAdd.size() > 0) {
- while (!serversToAdd.isEmpty()) {
- Object[] tmp = serversToAdd.poll();
- ServerSocketChannel channel =
(ServerSocketChannel) tmp[0];
- SelectionKey key = channel.register(selector,
SelectionKey.OP_ACCEPT);
- key.attach(tmp);
- }
- }
- // pop new session for starting read/write
- if (sessionsToConnect.size() > 0) {
- processConnectSessions();
- }
-
- // pop session for close, if any
- if (sessionsToClose.size() > 0) {
- processCloseSessions();
- }
-
- LOGGER.debug("selecting...");
- int readyCount = selector.select(SELECT_TIMEOUT);
- LOGGER.debug("... done selecting : {}", readyCount);
-
- if (readyCount > 0) {
- // process selected keys
- Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
-
- // Loop on each SelectionKey and process any valid
action
- while (selectedKeys.hasNext()) {
- SelectionKey key = selectedKeys.next();
- selectedKeys.remove();
-
- if (!key.isValid()) {
- continue;
+ // pop new server sockets for accepting
+ if (serversToAdd.size() > 0) {
+ while (!serversToAdd.isEmpty()) {
+ Object[] tmp = serversToAdd.poll();
+ ServerSocketChannel channel =
(ServerSocketChannel) tmp[0];
+ SelectionKey key = channel.register(selector,
SelectionKey.OP_ACCEPT);
+ key.attach(tmp);
}
+ }
- selector.selectedKeys().remove(key);
+ // pop new session for starting read/write
+ if (sessionsToConnect.size() > 0) {
+ processConnectSessions();
+ }
- if (key.isReadable()) {
- processRead(key);
- }
+ // pop session for close, if any
+ if (sessionsToClose.size() > 0) {
+ processCloseSessions();
+ }
- if (key.isWritable()) {
- processWrite(key);
- }
+ LOGGER.debug("selecting...");
+ int readyCount = selector.select(SELECT_TIMEOUT);
+ LOGGER.debug("... done selecting : {}", readyCount);
+
+ if (readyCount > 0) {
+ // process selected keys
+ Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
+
+ // Loop on each SelectionKey and process any valid
action
+ while (selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ if (!key.isValid()) {
+ continue;
+ }
+
+ selector.selectedKeys().remove(key);
+
+ if (key.isAcceptable()) {
+ processAccept(key);
+ }
+
+ if (key.isReadable()) {
+ processRead(key);
+ }
+
+ if (key.isWritable()) {
+ processWrite(key);
+ }
- if (key.isAcceptable()) {
- processAccept(key);
}
}
- }
- // registering session with data in the write queue for
- // writing
- while (!flushingSessions.isEmpty()) {
- processFushSessions();
+ // registering session with data in the write queue for
+ // writing
+ while (!flushingSessions.isEmpty()) {
+ processFushSessions();
+ }
+ } catch (IOException e) {
+ LOGGER.error("IOException while selecting selector",
e);
}
- } catch (IOException e) {
- LOGGER.error("IOException while selecting selector", e);
- }
- // stop the worker if needed
- workerLock.lock();
- try {
- if (selector.keys().isEmpty()) {
- worker = null;
- break;
+ // stop the worker if needed
+ workerLock.lock();
+ try {
+ if (selector.keys().isEmpty()) {
+ worker = null;
+ break;
+ }
+ } finally {
+ workerLock.unlock();
}
- } finally {
- workerLock.unlock();
}
+ } catch (Exception e) {
+ LOGGER.error("Unexpected exception : ", e);
}
}
-
+
/**
* Handles all the sessions that must be connected
*/
@@ -408,14 +413,14 @@ public class NioSelectorProcessor implem
// will occur later.
if (!session.isSecured()) {
session.setConnected();
-
+
// fire the event
((AbstractIoService)
session.getService()).fireSessionCreated(session);
session.getFilterChain().processSessionOpened(session);
}
}
}
-
+
/**
* Handles all the sessions that must be closed
*/
@@ -433,7 +438,7 @@ public class NioSelectorProcessor implem
((AbstractIoService)
session.getService()).fireSessionDestroyed(session);
}
}
-
+
/**
* Processes the Accept action for the given SelectionKey
*/
@@ -445,14 +450,13 @@ public class NioSelectorProcessor implem
SocketChannel newClientChannel = serverSocket.accept();
LOGGER.debug("client accepted");
// and give it's to the strategy
-
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
- newClientChannel);
+
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
newClientChannel);
}
-
+
/**
* Processes the Read action for the given SelectionKey
*/
- private void processRead(SelectionKey key) throws IOException{
+ private void processRead(SelectionKey key) throws IOException {
LOGGER.debug("readable client {}", key);
NioTcpSession session = (NioTcpSession) key.attachment();
SocketChannel channel = session.getSocketChannel();
@@ -469,7 +473,7 @@ public class NioSelectorProcessor implem
// we have read some data
// limit at the current position & rewind buffer back to start
& push to the chain
readBuffer.flip();
-
+
if (session.isSecured() && !session.isConnectedSecured()) {
// Process the SSL handshake now
processHandShake(session, readBuffer);
@@ -478,120 +482,118 @@ public class NioSelectorProcessor implem
}
}
}
-
+
private boolean processHandShake(IoSession session, ByteBuffer
inBuffer) throws SSLException {
- SslHelper sslHelper = session.getAttribute( IoSession.SSL_HELPER );
-
+ SslHelper sslHelper = session.getAttribute(IoSession.SSL_HELPER);
+
if (sslHelper == null) {
throw new IllegalStateException();
}
-
+
SSLEngine engine = sslHelper.getEngine();
HandshakeStatus hsStatus = engine.getHandshakeStatus();
boolean processingData = true;
-
+
// Start the Handshake if we aren't already processing a HandShake
// and switch to the SECURING state
if (!handshaking) {
engine.beginHandshake();
handshaking = true;
- session.changeState( SessionState.SECURING );
+ session.changeState(SessionState.SECURING);
}
-
+
hsStatus = engine.getHandshakeStatus();
// If the SSLEngine has not be started, then the status will be
NOT_HANDSHAKING
- while ((hsStatus != HandshakeStatus.FINISHED) &&
- (hsStatus != HandshakeStatus.NOT_HANDSHAKING ) &&
- processingData) {
+ while ((hsStatus != HandshakeStatus.FINISHED) && (hsStatus !=
HandshakeStatus.NOT_HANDSHAKING)
+ && processingData) {
switch (hsStatus) {
- case NEED_TASK :
- hsStatus = sslHelper.processTasks(engine);
-
- break;
-
- case NEED_WRAP :
- if ( LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} processing the NEED_WRAP state",
session);
- }
-
- int capacity =
engine.getSession().getPacketBufferSize();
- ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
- SSLEngineResult result = null;
-
- while (true) {
- result = engine.wrap(EMPTY_BUFFER, outBuffer);
-
- if (result.getStatus() ==
SSLEngineResult.Status.BUFFER_OVERFLOW) {
- // TODO : increase the AppBuffer size
- } else {
- break;
- }
- }
-
- outBuffer.flip();
- session.write(outBuffer);
- hsStatus = result.getHandshakeStatus();
-
- // We continue to loop while we don't expect messages
to unwrap,
- // otherwise, we have to exit the loop.
- processingData = (hsStatus !=
HandshakeStatus.NEED_UNWRAP);
+ case NEED_TASK:
+ hsStatus = sslHelper.processTasks(engine);
- break;
-
- case NEED_UNWRAP :
- Status status = sslHelper.processUnwrap(engine,
inBuffer, EMPTY_BUFFER);
-
- if ( status == Status.BUFFER_UNDERFLOW) {
- // Read more data
- processingData = false;
+ break;
+
+ case NEED_WRAP:
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} processing the NEED_WRAP state",
session);
+ }
+
+ int capacity = engine.getSession().getPacketBufferSize();
+ ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
+ SSLEngineResult result = null;
+
+ while (true) {
+ result = engine.wrap(EMPTY_BUFFER, outBuffer);
+
+ if (result.getStatus() ==
SSLEngineResult.Status.BUFFER_OVERFLOW) {
+ // TODO : increase the AppBuffer size
} else {
- hsStatus = engine.getHandshakeStatus();
+ break;
}
-
- break;
+ }
+
+ outBuffer.flip();
+ session.write(outBuffer);
+ hsStatus = result.getHandshakeStatus();
+
+ // We continue to loop while we don't expect messages to
unwrap,
+ // otherwise, we have to exit the loop.
+ processingData = (hsStatus != HandshakeStatus.NEED_UNWRAP);
+
+ break;
+
+ case NEED_UNWRAP:
+ Status status = sslHelper.processUnwrap(engine, inBuffer,
EMPTY_BUFFER);
+
+ if (status == Status.BUFFER_UNDERFLOW) {
+ // Read more data
+ processingData = false;
+ } else {
+ hsStatus = engine.getHandshakeStatus();
+ }
+
+ break;
}
}
-
+
if (hsStatus == HandshakeStatus.FINISHED) {
- if ( LOGGER.isDebugEnabled()) {
+ if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} processing the FINISHED state", session);
}
-
+
session.changeState(SessionState.SECURED);
handshaking = false;
return true;
}
-
+
return false;
}
-
+
/**
* Processes the Write action for the given SelectionKey
*/
private void processWrite(SelectionKey key) throws IOException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("writable session : {}", key.attachment());
- }
-
NioTcpSession session = (NioTcpSession) key.attachment();
+
+ LOGGER.debug("writable session : {}", session);
+
session.setNotRegisteredForWrite();
// write from the session write queue
boolean isEmpty = false;
-
+
try {
Queue<WriteRequest> queue = session.acquireWriteQueue();
do {
// get a write request from the queue
WriteRequest wreq = queue.peek();
-
+
if (wreq == null) {
break;
}
-
+
ByteBuffer buf = (ByteBuffer) wreq.getMessage();
int wrote = session.getSocketChannel().write(buf);
@@ -606,7 +608,7 @@ public class NioSelectorProcessor implem
queue.remove();
// complete the future
DefaultWriteFuture future = (DefaultWriteFuture)
wreq.getFuture();
-
+
if (future != null) {
future.complete();
}
@@ -617,7 +619,7 @@ public class NioSelectorProcessor implem
break;
}
} while (!queue.isEmpty());
-
+
isEmpty = queue.isEmpty();
} finally {
session.releaseWriteQueue();
@@ -626,22 +628,28 @@ public class NioSelectorProcessor implem
// if the session is no more interested in writing, we need
// to stop listening for OP_WRITE events
if (isEmpty) {
- // a key registered for read ? (because we can have a
- // Selector for reads and another for the writes
- SelectionKey readKey = sessionReadKey.get(session);
-
- if (readKey != null) {
- LOGGER.debug("registering key for only reading");
- SelectionKey mykey =
session.getSocketChannel().register(selector,
- SelectionKey.OP_READ, session);
- sessionReadKey.put(session, mykey);
+ if (session.isClosing()) {
+ LOGGER.debug("closing session {} have empty write queue,
so we close it", session);
+ // we was flushing writes, now we to the close
+ session.getSocketChannel().close();
} else {
- LOGGER.debug("cancel key for writing");
- session.getSocketChannel().keyFor(selector).cancel();
+ // a key registered for read ? (because we can have a
+ // Selector for reads and another for the writes
+ SelectionKey readKey = sessionReadKey.get(session);
+
+ if (readKey != null) {
+ LOGGER.debug("registering key for only reading");
+ SelectionKey mykey =
session.getSocketChannel().register(selector, SelectionKey.OP_READ,
+ session);
+ sessionReadKey.put(session, mykey);
+ } else {
+ LOGGER.debug("cancel key for writing");
+ session.getSocketChannel().keyFor(selector).cancel();
+ }
}
}
}
-
+
/**
* Flushes the sessions
*/
@@ -650,7 +658,7 @@ public class NioSelectorProcessor implem
// a key registered for read ? (because we can have a
// Selector for reads and another for the writes
SelectionKey readKey = sessionReadKey.get(session);
-
+
if (readKey != null) {
// register for read/write
SelectionKey key =
session.getSocketChannel().register(selector,
Modified:
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1212998&r1=1212997&r2=1212998&view=diff
==============================================================================
---
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
(original)
+++
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
Sun Dec 11 12:53:04 2011
@@ -107,6 +107,7 @@ public class HttpTest {
session.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpStatus.SUCCESS_OK, headers));
session.write(content);
session.write(new HttpEndOfContent());
+ session.close(false);
}
}