Author: elecharny
Date: Tue Dec 6 17:09:17 2011
New Revision: 1211031
URL: http://svn.apache.org/viewvc?rev=1211031&view=rev
Log:
o Added the SshHelper class to manage the SSL communication and handshake
o Renamed addSslContext to setSslContext
o Added the IoSession.initSecure(SSLContext) method
o Refactored the NioSelectorProcessor by extracting all the actions from the
main loop, to make it clearer (some methods have been added)
Added:
mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java
mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java?rev=1211031&r1=1211030&r2=1211031&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java Tue Dec 6
17:09:17 2011
@@ -97,10 +97,11 @@ public interface IoService {
* Inject a {@link SSLContex} valid for the service. This {@link
SSLContex} will be used
* by the SSLEngine to handle secured connections.<br/>
* The {@link SSLContex} must have been created and initialized before
being injected in
- * the service.
+ * the service.<br/>
+ * By setting a {@link SSLContext}, the service switch to secured.
* @param sslContext The configured {@link SSLContex}.
*/
- void addSslContext(SSLContext sslContext);
+ void setSslContext(SSLContext sslContext);
/**
* @return The {@link SSLContext} instance stored in the service.
Modified: mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1211031&r1=1211030&r2=1211031&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java Tue Dec 6
17:09:17 2011
@@ -23,6 +23,9 @@ import java.net.SocketAddress;
import java.util.Queue;
import java.util.Set;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
import org.apache.mina.filterchain.IoFilterController;
import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.session.WriteRequest;
@@ -50,7 +53,9 @@ import org.apache.mina.session.WriteRequ
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface IoSession {
-
+ /** The SslHelper instance name, stored in the session's attributes */
+ static final String SSL_HELPER = "internal_sslHelper";
+
/**
* The unique identifier of this session.
*
@@ -117,6 +122,14 @@ public interface IoSession {
* @return <code>true</tt> if and only if this session is belonging a
secured connection.
*/
boolean isSecured();
+
+ /**
+ * Initializes the SSL/TLS environment for this session.
+ *
+ * @param sslContext The SLLCOntext instance to use.
+ * @throws SSLException If the SSL/TLS configuration can't be initialized
+ */
+ void initSecure(SSLContext sslContext) throws SSLException;
/**
* Tells if the session is using SSL/TLS.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1211031&r1=1211030&r2=1211031&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
Tue Dec 6 17:09:17 2011
@@ -29,6 +29,9 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
@@ -185,6 +188,18 @@ public abstract class AbstractIoSession
public void setSecured(boolean secured) {
this.secured = secured;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void initSecure(SSLContext sslContext) throws SSLException {
+ SslHelper sslHelper = new SslHelper(this, sslContext);
+ sslHelper.init();
+
+ attributes.put(SSL_HELPER, sslHelper);
+ setSecured(true);
+ }
+
/**
* {@inheritDoc}
Added: mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java?rev=1211031&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java Tue
Dec 6 17:09:17 2011
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.session;
+
+import java.net.InetSocketAddress;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.mina.api.IoClient;
+import org.apache.mina.api.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An helper class used to manage everyting related to SSL/TLS establishement
+ * and management.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+/** No qualifier */ class SslHelper
+{
+ /** A logger for this class */
+ private final static Logger LOGGER =
LoggerFactory.getLogger(SslHelper.class);
+
+ /** The SSL engine instance */
+ private SSLEngine sslEngine;
+
+ /** The SSLContext instance */
+ private final SSLContext sslContext;
+
+ /** The current session */
+ private final IoSession session;
+
+ /**
+ * A session attribute key that should be set to an {@link
InetSocketAddress}.
+ * Setting this attribute causes
+ * {@link SSLContext#createSSLEngine(String, int)} to be called passing the
+ * hostname and port of the {@link InetSocketAddress} to get an
+ * {@link SSLEngine} instance. If not set {@link
SSLContext#createSSLEngine()}
+ * will be called.<br/>
+ * Using this feature {@link SSLSession} objects may be cached and reused
+ * when in client mode.
+ *
+ * @see SSLContext#createSSLEngine(String, int)
+ */
+ public static final String PEER_ADDRESS = "internal_peerAddress";
+
+ public static final String WANT_CLIENT_AUTH = "internal_wantClientAuth";
+
+ public static final String NEED_CLIENT_AUTH = "internal_needClientAuth";
+
+ /** The Handshake status */
+ private SSLEngineResult.HandshakeStatus handshakeStatus;
+
+ /**
+ * Create a new SSL Handler.
+ *
+ * @param session The associated session
+ * @throws SSLException
+ */
+ SslHelper(IoSession session, SSLContext sslContext) throws SSLException {
+ this.session = session;
+ this.sslContext = sslContext;
+ }
+
+ /**
+ * @return The associated session
+ */
+ /* no qualifier */ IoSession getSession() {
+ return session;
+ }
+
+
+ /**
+ * @return The associated SSLEngine
+ */
+ /* no qualifier */ SSLEngine getEngine() {
+ return sslEngine;
+ }
+
+ /**
+ * Initialize the SSL handshake.
+ *
+ * @throws SSLException If the underlying SSLEngine handshake
initialization failed
+ */
+ /* no qualifier */ void init() throws SSLException {
+ if (sslEngine != null) {
+ // We already have a SSL engine created, no need to create a new
one
+ return;
+ }
+
+ LOGGER.debug("{} Initializing the SSL Helper", session);
+
+ InetSocketAddress peer = (InetSocketAddress)
session.getAttribute(PEER_ADDRESS);
+
+ // Create the SSL engine here
+ if (peer == null) {
+ sslEngine = sslContext.createSSLEngine();
+ } else {
+ sslEngine = sslContext.createSSLEngine(peer.getHostName(),
peer.getPort());
+ }
+
+ // Initialize the engine in client mode if necessary
+ sslEngine.setUseClientMode(session.getService() instanceof IoClient);
+
+ // Initialize the different SslEngine modes
+ if (!sslEngine.getUseClientMode()) {
+ // Those parameters are only valid when in server mode
+ boolean needClientAuth =
session.<Boolean>getAttribute(NEED_CLIENT_AUTH);
+ boolean wantClientAuth =
session.<Boolean>getAttribute(WANT_CLIENT_AUTH);
+
+ // The WantClientAuth superseed the NeedClientAuth, if set.
+ if (needClientAuth) {
+ sslEngine.setNeedClientAuth(true);
+ }
+
+ if (wantClientAuth) {
+ sslEngine.setWantClientAuth(true);
+ }
+ }
+
+ handshakeStatus = sslEngine.getHandshakeStatus();
+
+ if ( LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} SSL Handler Initialization done.", session);
+ }
+ }
+}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1211031&r1=1211030&r2=1211031&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Tue Dec 6 17:09:17 2011
@@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLException;
+
import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
@@ -176,7 +178,7 @@ public class NioSelectorProcessor implem
* {@inheritDoc}
*/
@Override
- public void createSession(IoService service, Object clientSocket) {
+ public void createSession(IoService service, Object clientSocket) throws
SSLException {
LOGGER.debug("create session");
final SocketChannel socketChannel = (SocketChannel) clientSocket;
final SocketSessionConfig defaultConfig = (SocketSessionConfig)
service.getSessionConfig();
@@ -239,9 +241,9 @@ public class NioSelectorProcessor implem
session.getConfig().setSoLinger(soLinger);
}
- // Set the secured fag if the service is to be used over SSL/TLS
+ // Set the secured flag if the service is to be used over SSL/TLS
if (service.isSecured()) {
- session.setSecured(true);
+ session.initSecure( service.getSslContext() );
}
// event session created
@@ -315,39 +317,12 @@ public class NioSelectorProcessor implem
// pop new session for starting read/write
if (sessionsToConnect.size() > 0) {
- while (!sessionsToConnect.isEmpty()) {
- NioTcpSession session = sessionsToConnect.poll();
- SelectionKey key =
session.getSocketChannel().register(selector, SelectionKey.OP_READ);
- key.attach(session);
- sessionReadKey.put(session, key);
-
- // Switch to CONNECTED, only if the session is not
secured, as the SSL Handshake
- // will occur later.
- if (!session.isSecured()) {
- session.setConnected();
-
- // fire the event
- ((AbstractIoService)
session.getService()).fireSessionCreated(session);
-
session.getFilterChain().processSessionOpened(session);
- }
- }
+ processConnectSessions();
}
- // pop session for close
+ // pop session for close, if any
if (sessionsToClose.size() > 0) {
- while (!sessionsToClose.isEmpty()) {
- NioTcpSession session = sessionsToClose.poll();
-
- SelectionKey key = sessionReadKey.remove(session);
- key.cancel();
-
- // closing underlying socket
- session.getSocketChannel().close();
- // fire the event
-
session.getFilterChain().processSessionClosed(session);
- ((AbstractIoService)
session.getService()).fireSessionDestroyed(session);
-
- }
+ processCloseSessions();
}
LOGGER.debug("selecting...");
@@ -358,6 +333,7 @@ public class NioSelectorProcessor implem
// process selected keys
Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
+ // Loop on each SelectionKey and process any valid
action
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
@@ -369,134 +345,23 @@ public class NioSelectorProcessor implem
selector.selectedKeys().remove(key);
if (key.isReadable()) {
- LOGGER.debug("readable client {}", key);
- NioTcpSession session = (NioTcpSession)
key.attachment();
- SocketChannel channel =
session.getSocketChannel();
- readBuffer.rewind();
- int readCount = channel.read(readBuffer);
- LOGGER.debug("read {} bytes", readCount);
-
- if (readCount < 0) {
- // session closed by the remote peer
- LOGGER.debug("session closed by the remote
peer");
- sessionsToClose.add(session);
- } else {
- // we have read some data
- // limit at the current position & rewind
buffer back to start & push to the chain
- readBuffer.flip();
-
- if (session.isSecured() &&
!session.isConnectedSecured()) {
- // Process the SSL handshake now
- //processHandShake(session,
readBuffer);
- } else {
-
session.getFilterChain().processMessageReceived(session, readBuffer);
- }
- }
+ processRead(key);
}
if (key.isWritable()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("writable session : {}",
key.attachment());
- }
- NioTcpSession session = (NioTcpSession)
key.attachment();
- session.setNotRegisteredForWrite();
-
- // write from the session write queue
- boolean isEmpty = false;
-
- try {
- Queue<WriteRequest> queue =
session.acquireWriteQueue();
-
- do {
- // get a write request from the queue
- WriteRequest wreq = queue.peek();
-
- if (wreq == null) {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer)
wreq.getMessage();
-
- int wrote =
session.getSocketChannel().write(buf);
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("wrote {} bytes to
{}", wrote, session);
- }
-
- if (buf.remaining() == 0) {
- // completed write request, let's
remove
- // it
- queue.remove();
- // complete the future
- DefaultWriteFuture future =
(DefaultWriteFuture) wreq.getFuture();
-
- if (future != null) {
- future.complete();
- }
- } else {
- // output socket buffer is full,
we need
- // to give up until next selection
for
- // writing
- break;
- }
- } while (!queue.isEmpty());
-
- isEmpty = queue.isEmpty();
- } finally {
- session.releaseWriteQueue();
- }
-
- // if the session is no more interested in
writing, we need
- // to stop listening for OP_WRITE events
- if (isEmpty) {
- // a key registered for read ? (because we
can have a
- // Selector for reads and another for the
writes
- SelectionKey readKey =
sessionReadKey.get(session);
-
- if (readKey != null) {
- LOGGER.debug("registering key for only
reading");
- SelectionKey mykey =
session.getSocketChannel().register(selector,
- SelectionKey.OP_READ, session);
- sessionReadKey.put(session, mykey);
- } else {
- LOGGER.debug("cancel key for writing");
-
session.getSocketChannel().keyFor(selector).cancel();
- }
- }
+ processWrite(key);
}
if (key.isAcceptable()) {
- LOGGER.debug("acceptable new client {}", key);
- ServerSocketChannel serverSocket =
(ServerSocketChannel) ((Object[]) key.attachment())[0];
- IoServer server = (IoServer) (((Object[])
key.attachment())[1]);
- // accepted connection
- SocketChannel newClientChannel =
serverSocket.accept();
- LOGGER.debug("client accepted");
- // and give it's to the strategy
-
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
- newClientChannel);
+ processAccept(key);
}
-
}
}
// registering session with data in the write queue for
// writing
while (!flushingSessions.isEmpty()) {
- NioTcpSession session = flushingSessions.poll();
- // a key registered for read ? (because we can have a
- // Selector for reads and another for the writes
- SelectionKey readKey = sessionReadKey.get(session);
- if (readKey != null) {
- // register for read/write
- SelectionKey key =
session.getSocketChannel().register(selector,
- SelectionKey.OP_READ |
SelectionKey.OP_WRITE, session);
-
- sessionReadKey.put(session, key);
-
- } else {
- session.getSocketChannel().register(selector,
SelectionKey.OP_WRITE, session);
- }
+ processFushSessions();
}
} catch (IOException e) {
LOGGER.error("IOException while selecting selector", e);
@@ -514,5 +379,183 @@ public class NioSelectorProcessor implem
}
}
}
+
+ /**
+ * Handles all the sessions that must be connected
+ */
+ private void processConnectSessions() throws IOException {
+ while (!sessionsToConnect.isEmpty()) {
+ NioTcpSession session = sessionsToConnect.poll();
+ SelectionKey key =
session.getSocketChannel().register(selector, SelectionKey.OP_READ);
+ key.attach(session);
+ sessionReadKey.put(session, key);
+
+ // Switch to CONNECTED, only if the session is not secured, as
the SSL Handshake
+ // will occur later.
+ if (!session.isSecured()) {
+ session.setConnected();
+
+ // fire the event
+ ((AbstractIoService)
session.getService()).fireSessionCreated(session);
+ session.getFilterChain().processSessionOpened(session);
+ }
+ }
+ }
+
+ /**
+ * Handles all the sessions that must be closed
+ */
+ private void processCloseSessions() throws IOException {
+ while (!sessionsToClose.isEmpty()) {
+ NioTcpSession session = sessionsToClose.poll();
+
+ SelectionKey key = sessionReadKey.remove(session);
+ key.cancel();
+
+ // closing underlying socket
+ session.getSocketChannel().close();
+ // fire the event
+ session.getFilterChain().processSessionClosed(session);
+ ((AbstractIoService)
session.getService()).fireSessionDestroyed(session);
+ }
+ }
+
+ /**
+ * Processes the Accept action for the given SelectionKey
+ */
+ private void processAccept(SelectionKey key) throws IOException {
+ LOGGER.debug("acceptable new client {}", key);
+ ServerSocketChannel serverSocket = (ServerSocketChannel)
((Object[]) key.attachment())[0];
+ IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
+ // accepted connection
+ SocketChannel newClientChannel = serverSocket.accept();
+ LOGGER.debug("client accepted");
+ // and give it's to the strategy
+
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
+ newClientChannel);
+ }
+
+ /**
+ * Processes the Read action for the given SelectionKey
+ */
+ private void processRead(SelectionKey key) throws IOException{
+ LOGGER.debug("readable client {}", key);
+ NioTcpSession session = (NioTcpSession) key.attachment();
+ SocketChannel channel = session.getSocketChannel();
+ readBuffer.rewind();
+ int readCount = channel.read(readBuffer);
+ LOGGER.debug("read {} bytes", readCount);
+
+ if (readCount < 0) {
+ // session closed by the remote peer
+ LOGGER.debug("session closed by the remote peer");
+ sessionsToClose.add(session);
+ } else {
+ // we have read some data
+ // limit at the current position & rewind buffer back to start
& push to the chain
+ readBuffer.flip();
+
+ if (session.isSecured() && !session.isConnectedSecured()) {
+ // Process the SSL handshake now
+ //processHandShake(session, readBuffer);
+ } else {
+ session.getFilterChain().processMessageReceived(session,
readBuffer);
+ }
+ }
+ }
+
+ /**
+ * Processes the Write action for the given SelectionKey
+ */
+ private void processWrite(SelectionKey key) throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("writable session : {}", key.attachment());
+ }
+ NioTcpSession session = (NioTcpSession) key.attachment();
+ session.setNotRegisteredForWrite();
+
+ // write from the session write queue
+ boolean isEmpty = false;
+
+ try {
+ Queue<WriteRequest> queue = session.acquireWriteQueue();
+
+ do {
+ // get a write request from the queue
+ WriteRequest wreq = queue.peek();
+
+ if (wreq == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+
+ int wrote = session.getSocketChannel().write(buf);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("wrote {} bytes to {}", wrote, session);
+ }
+
+ if (buf.remaining() == 0) {
+ // completed write request, let's remove
+ // it
+ queue.remove();
+ // complete the future
+ DefaultWriteFuture future = (DefaultWriteFuture)
wreq.getFuture();
+
+ if (future != null) {
+ future.complete();
+ }
+ } else {
+ // output socket buffer is full, we need
+ // to give up until next selection for
+ // writing
+ break;
+ }
+ } while (!queue.isEmpty());
+
+ isEmpty = queue.isEmpty();
+ } finally {
+ session.releaseWriteQueue();
+ }
+
+ // if the session is no more interested in writing, we need
+ // to stop listening for OP_WRITE events
+ if (isEmpty) {
+ // a key registered for read ? (because we can have a
+ // Selector for reads and another for the writes
+ SelectionKey readKey = sessionReadKey.get(session);
+
+ if (readKey != null) {
+ LOGGER.debug("registering key for only reading");
+ SelectionKey mykey =
session.getSocketChannel().register(selector,
+ SelectionKey.OP_READ, session);
+ sessionReadKey.put(session, mykey);
+ } else {
+ LOGGER.debug("cancel key for writing");
+ session.getSocketChannel().keyFor(selector).cancel();
+ }
+ }
+ }
+
+ /**
+ * Flushes the sessions
+ */
+ private void processFushSessions() throws IOException {
+ NioTcpSession session = flushingSessions.poll();
+ // a key registered for read ? (because we can have a
+ // Selector for reads and another for the writes
+ SelectionKey readKey = sessionReadKey.get(session);
+
+ if (readKey != null) {
+ // register for read/write
+ SelectionKey key =
session.getSocketChannel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE, session);
+
+ sessionReadKey.put(session, key);
+ } else {
+ session.getSocketChannel().register(selector,
SelectionKey.OP_WRITE, session);
+ }
+ }
}
}