Author: elecharny
Date: Mon Dec 12 20:58:31 2011
New Revision: 1213441
URL: http://svn.apache.org/viewvc?rev=1213441&view=rev
Log:
o The IoSession is made responsible for encrypting the data when in secured mode
o Moved out of NioSelectorProcessor everything which is SSL related (moved to
SslHelper)
o Get rid of many flags that are not necessary when dealing with SSL handshake
o Used a wider buffer for Handshake unwrap
o Refactored the complete Handshake logic
o Made most of the SslHelper methods private
o Added some missing buffer increase
Modified:
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1213441&r1=1213440&r2=1213441&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
Mon Dec 12 20:58:31 2011
@@ -457,7 +457,8 @@ public abstract class AbstractIoSession
private void doWriteWithFuture(Object message, IoFuture<Void> future) {
LOG.debug("writing message {} to session {}", message, this);
- if (state == SessionState.CLOSED || state == SessionState.CLOSING) {
+
+ if ((state == SessionState.CLOSED) || (state == SessionState.CLOSING))
{
LOG.error("writing to closed or closing session, the message is
discarded");
return;
}
@@ -470,11 +471,24 @@ public abstract class AbstractIoSession
* {@inheritDoc}
*/
public WriteRequest enqueueWriteRequest(Object message) {
- DefaultWriteRequest request = new DefaultWriteRequest(message);
+ DefaultWriteRequest request = null;
try {
writeQueueReadLock.lock();
- writeQueue.add(request);
+
+ if ( isConnectedSecured()) {
+ SslHelper sslHelper = getAttribute( IoSession.SSL_HELPER );
+
+ if (sslHelper == null) {
+ throw new IllegalStateException();
+ }
+
+ request = sslHelper.processWrite(this, message, writeQueue);
+ } else {
+ request = new DefaultWriteRequest(message);
+
+ writeQueue.add(request);
+ }
} finally {
writeQueueReadLock.unlock();
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java?rev=1213441&r1=1213440&r2=1213441&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java Mon
Dec 12 20:58:31 2011
@@ -21,6 +21,7 @@ package org.apache.mina.session;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.Queue;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -32,6 +33,7 @@ import javax.net.ssl.SSLSession;
import org.apache.mina.api.IoClient;
import org.apache.mina.api.IoSession;
+import org.apache.mina.api.IoSession.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +57,9 @@ public class SslHelper
/** The current session */
private final IoSession session;
+ /** A flag set when we process the handshake */
+ private boolean handshaking = false;
+
/**
* A session attribute key that should be set to an {@link
InetSocketAddress}.
* Setting this attribute causes
@@ -73,22 +78,25 @@ public class SslHelper
public static final String NEED_CLIENT_AUTH = "internal_needClientAuth";
- /** The Handshake status */
- private SSLEngineResult.HandshakeStatus handshakeStatus;
-
/** Application cleartext data to be read by application */
private ByteBuffer appBuffer;
/** Incoming buffer accumulating bytes read from the channel */
private ByteBuffer accBuffer;
+ /** An empty buffer used during the handshake phase */
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+ /** An empty buffer used during the handshake phase */
+ private static final ByteBuffer HANDSHAKE_BUFFER =
ByteBuffer.allocate(1024);
+
/**
* Create a new SSL Handler.
*
* @param session The associated session
* @throws SSLException
*/
- SslHelper(IoSession session, SSLContext sslContext) throws SSLException {
+ public SslHelper(IoSession session, SSLContext sslContext) throws
SSLException {
this.session = session;
this.sslContext = sslContext;
}
@@ -96,7 +104,7 @@ public class SslHelper
/**
* @return The associated session
*/
- /* no qualifier */ IoSession getSession() {
+ private IoSession getSession() {
return session;
}
@@ -104,7 +112,7 @@ public class SslHelper
/**
* @return The associated SSLEngine
*/
- public SSLEngine getEngine() {
+ private SSLEngine getEngine() {
return sslEngine;
}
@@ -139,7 +147,7 @@ public class SslHelper
Boolean needClientAuth =
session.<Boolean>getAttribute(NEED_CLIENT_AUTH);
Boolean wantClientAuth =
session.<Boolean>getAttribute(WANT_CLIENT_AUTH);
- // The WantClientAuth superseed the NeedClientAuth, if set.
+ // The WantClientAuth supersede the NeedClientAuth, if set.
if ((needClientAuth != null) && (needClientAuth)) {
sslEngine.setNeedClientAuth(true);
}
@@ -149,36 +157,11 @@ public class SslHelper
}
}
- handshakeStatus = sslEngine.getHandshakeStatus();
-
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug("{} SSL Handler Initialization done.", session);
}
}
- /**
- * Get the local AppBuffer
- * @return The Application Buffer allocated in the SslHelper, if any
- */
- public ByteBuffer getAppBuffer() {
- return appBuffer;
- }
-
- public ByteBuffer getSslInBuffer(ByteBuffer inBuffer) {
- if (inBuffer == null) {
- inBuffer = inBuffer;
- } else {
- addInBuffer(inBuffer);
- }
-
- return inBuffer;
- }
-
- public void setInBuffer(ByteBuffer inBuffer) {
- inBuffer = ByteBuffer.allocate(16*1024);
- inBuffer.put(inBuffer);
- }
-
private void addInBuffer(ByteBuffer buffer) {
if (accBuffer.capacity() - accBuffer.limit() < buffer.remaining()) {
// Increase the internal buffer
@@ -198,10 +181,6 @@ public class SslHelper
}
}
- public void releaseInBuffer() {
- accBuffer = null;
- }
-
/**
* Process the NEED_TASK action.
*
@@ -209,7 +188,7 @@ public class SslHelper
* @return The resulting HandshakeStatus
* @throws SSLException If we've got an error while processing the tasks
*/
- public HandshakeStatus processTasks(SSLEngine engine) throws SSLException {
+ private HandshakeStatus processTasks(SSLEngine engine) throws SSLException
{
Runnable runnable;
while ((runnable = engine.getDelegatedTask()) != null) {
@@ -238,11 +217,11 @@ public class SslHelper
* @return
* @throws SSLException
*/
- public Status processUnwrap(SSLEngine engine, ByteBuffer inBuffer,
ByteBuffer appBuffer) throws SSLException {
+ private SSLEngineResult unwrap(ByteBuffer inBuffer, ByteBuffer appBuffer)
throws SSLException {
ByteBuffer tempBuffer = null;
// First work with either the new incoming buffer, or the accumulating
buffer
- if ((this.accBuffer != null) && (this.accBuffer.remaining() > 0)) {
+ if ((accBuffer != null) && (accBuffer.remaining() > 0)) {
// Add the new incoming data into the local buffer
addInBuffer(inBuffer);
tempBuffer = this.accBuffer;
@@ -253,13 +232,15 @@ public class SslHelper
// Loop until we have processed the entire incoming buffer,
// or until we have to stop
while (true) {
- SSLEngineResult result = engine.unwrap(tempBuffer, appBuffer);
+ // Do the unwrapping
+ SSLEngineResult result = sslEngine.unwrap(tempBuffer, appBuffer);
switch (result.getStatus()) {
case OK :
+ // Ok, we have unwrapped a message, return.
accBuffer = null;
- return Status.OK;
+ return result;
case BUFFER_UNDERFLOW :
// We need to read some more data from the channel.
@@ -270,12 +251,17 @@ public class SslHelper
inBuffer.clear();
- return Status.BUFFER_UNDERFLOW;
+ return result;
case CLOSED :
- // Get out
accBuffer = null;
- throw new IllegalStateException();
+
+ // We have received a Close message, we
+ if (session.isConnectedSecured()) {
+ return result;
+ } else {
+ throw new IllegalStateException();
+ }
case BUFFER_OVERFLOW :
// We have to increase the appBuffer size. In any case
@@ -284,4 +270,170 @@ public class SslHelper
}
}
}
+
+ public void processRead(IoSession session, ByteBuffer readBuffer) throws
SSLException {
+ if (session.isConnectedSecured()) {
+ // Unwrap the incoming data
+ processUnwrap(session, readBuffer);
+ } else {
+ // Process the SSL handshake now
+ processHandShake(session, readBuffer);
+ }
+ }
+
+
+ private void processUnwrap(IoSession session, ByteBuffer inBuffer) throws
SSLException {
+ // Blind guess : once uncompressed, the resulting buffer will be 3
times bigger
+ ByteBuffer appBuffer = ByteBuffer.allocate(inBuffer.limit() * 3);
+ SSLEngineResult result = unwrap(inBuffer, appBuffer );
+
+ switch (result.getStatus()) {
+ case OK :
+ // Ok, go through the chain now
+ appBuffer.flip();
+ session.getFilterChain().processMessageReceived(session,
appBuffer);
+ break;
+
+ case CLOSED :
+ processClosed( result);
+
+ break;
+ }
+ }
+
+ private void processClosed(SSLEngineResult result) throws SSLException {
+ // We have received a Alert_CLosure message, we will have to do a wrap
+ HandshakeStatus hsStatus = result.getHandshakeStatus();
+
+ if (hsStatus == HandshakeStatus.NEED_WRAP) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} processing the NEED_WRAP state", session);
+ }
+
+ int capacity = sslEngine.getSession().getPacketBufferSize();
+ ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
+ session.changeState( SessionState.CONNECTED );
+
+ while (!sslEngine.isOutboundDone()) {
+ sslEngine.wrap(EMPTY_BUFFER, outBuffer);
+ outBuffer.flip();
+
+ // Get out of the Connected state
+ session.enqueueWriteRequest(outBuffer);
+ }
+ }
+ }
+
+ private boolean processHandShake(IoSession session, ByteBuffer inBuffer)
throws SSLException {
+ // Start the Handshake if we aren't already processing a HandShake
+ // and switch to the SECURING state
+ HandshakeStatus hsStatus = sslEngine.getHandshakeStatus();
+
+ if ( hsStatus == HandshakeStatus.NOT_HANDSHAKING) {
+ session.changeState(SessionState.SECURING);
+ }
+
+ SSLEngineResult result = null;
+
+ // If the SSLEngine has not be started, then the status will be
NOT_HANDSHAKING
+ while (hsStatus != HandshakeStatus.FINISHED) {
+ if (hsStatus == HandshakeStatus.NEED_TASK) {
+ hsStatus = processTasks(sslEngine);
+ } else if (hsStatus == HandshakeStatus.NEED_WRAP) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} processing the NEED_WRAP state", session);
+ }
+
+ int capacity = sslEngine.getSession().getPacketBufferSize();
+ ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
+
+ boolean completed = false;
+
+ while (!completed) {
+ result = sslEngine.wrap(EMPTY_BUFFER, outBuffer);
+
+ switch (result.getStatus()) {
+ case OK :
+ case CLOSED :
+ completed = true;
+ break;
+
+ case BUFFER_OVERFLOW :
+ ByteBuffer newBuffer =
ByteBuffer.allocate(outBuffer.capacity() + 4096);
+ outBuffer.flip();
+ newBuffer.put(outBuffer);
+ outBuffer = newBuffer;
+ break;
+ }
+ }
+
+ outBuffer.flip();
+ session.enqueueWriteRequest(outBuffer);
+ hsStatus = result.getHandshakeStatus();
+
+ if (hsStatus != HandshakeStatus.NEED_WRAP) {
+ break;
+ }
+ } else if ((hsStatus == HandshakeStatus.NEED_UNWRAP) || (hsStatus
== HandshakeStatus.NOT_HANDSHAKING)) {
+ result = unwrap(inBuffer, HANDSHAKE_BUFFER);
+
+ if (result.getStatus() == Status.BUFFER_UNDERFLOW) {
+ // Read more data
+ break;
+ } else {
+ hsStatus = result.getHandshakeStatus();
+ }
+ }
+ }
+
+ if (hsStatus == HandshakeStatus.FINISHED) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} processing the FINISHED state", session);
+ }
+
+ HandshakeStatus stat = sslEngine.getHandshakeStatus();
+
+ session.changeState(SessionState.SECURED);
+ handshaking = false;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ public DefaultWriteRequest processWrite(IoSession sessions, Object
message, Queue<WriteRequest> writeQueue) {
+ ByteBuffer buf = (ByteBuffer)message;
+ ByteBuffer appBuffer = ByteBuffer.allocate(buf.limit() + 50);
+
+ try {
+ while (true) {
+ SSLEngineResult result = sslEngine.wrap(buf, appBuffer);
+
+ switch (result.getStatus()) {
+ case BUFFER_OVERFLOW :
+ // Increase the buffer size
+ appBuffer = ByteBuffer.allocate(appBuffer.capacity() +
4096);
+ break;
+
+ case BUFFER_UNDERFLOW :
+ case CLOSED :
+ break;
+ case OK :
+ DefaultWriteRequest request = new
DefaultWriteRequest(appBuffer);
+
+ writeQueue.add(request);
+ return request;
+ }
+
+ if ( result.getStatus() ==
SSLEngineResult.Status.BUFFER_OVERFLOW ) {
+ // Increase the buffer size
+ appBuffer = ByteBuffer.allocate(appBuffer.capacity() +
4096);
+ } else {
+ }
+ }
+ } catch (SSLException se) {
+ throw new IllegalStateException(se.getMessage());
+ }
+ }
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1213441&r1=1213440&r2=1213441&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Mon Dec 12 20:58:31 2011
@@ -36,16 +36,11 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
-import org.apache.mina.api.IoSession.SessionState;
import org.apache.mina.api.RuntimeIoException;
import org.apache.mina.service.AbstractIoService;
import org.apache.mina.service.SelectorProcessor;
@@ -82,9 +77,6 @@ public class NioSelectorProcessor implem
/** Application buffer for all the outgoing messages */
private ByteBuffer appBuffer = ByteBuffer.allocate(16 * 1024);
- /** An empty buffer used during the handshake phase */
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
// the thread polling and processing the I/O events
private SelectorWorker worker = null;
@@ -283,9 +275,6 @@ public class NioSelectorProcessor implem
// map for finding read keys associated with a given session
private Map<NioTcpSession, SelectionKey> sessionReadKey = new
HashMap<NioTcpSession, SelectionKey>();
-
- private boolean handshaking = false;
-
@Override
public void run() {
try {
@@ -372,7 +361,7 @@ public class NioSelectorProcessor implem
// registering session with data in the write queue for
// writing
while (!flushingSessions.isEmpty()) {
- processFushSessions();
+ processFlushSessions();
}
} catch (IOException e) {
LOGGER.error("IOException while selecting selector",
e);
@@ -459,7 +448,7 @@ public class NioSelectorProcessor implem
int readCount = channel.read(readBuffer);
LOGGER.debug("read {} bytes", readCount);
-
+
if (readCount < 0) {
// session closed by the remote peer
LOGGER.debug("session closed by the remote peer");
@@ -470,13 +459,13 @@ public class NioSelectorProcessor implem
readBuffer.flip();
if (session.isSecured()) {
- if (!session.isConnectedSecured()) {
- // Process the SSL handshake now
- processHandShake(session, readBuffer);
- } else {
- // Unwrap the incoming data
- processUnwrap(session, readBuffer);
+ SslHelper sslHelper =
session.getAttribute(IoSession.SSL_HELPER);
+
+ if (sslHelper == null) {
+ throw new IllegalStateException();
}
+
+ sslHelper.processRead(session, readBuffer);
} else {
// Plain message, not encrypted : go directly to the chain
session.getFilterChain().processMessageReceived(session,
readBuffer);
@@ -484,113 +473,6 @@ public class NioSelectorProcessor implem
}
}
- private void processUnwrap(IoSession session, ByteBuffer inBuffer)
throws SSLException {
- SslHelper sslHelper = session.getAttribute(IoSession.SSL_HELPER);
-
- if (sslHelper == null) {
- throw new IllegalStateException();
- }
-
- SSLEngine engine = sslHelper.getEngine();
-
- // Blind guess : once uncompressed, the resulting buffer will be 3
times bigger
- ByteBuffer appBuffer = ByteBuffer.allocate(inBuffer.limit() * 3);
- Status status = sslHelper.processUnwrap(engine, inBuffer,
appBuffer);
- appBuffer.flip();
-
- if (status == Status.OK) {
- // Ok, go through the chain now
- session.getFilterChain().processMessageReceived(session,
appBuffer);
- }
- }
-
- private boolean processHandShake(IoSession session, ByteBuffer
inBuffer) throws SSLException {
- SslHelper sslHelper = session.getAttribute(IoSession.SSL_HELPER);
-
- if (sslHelper == null) {
- throw new IllegalStateException();
- }
-
- SSLEngine engine = sslHelper.getEngine();
- HandshakeStatus hsStatus = engine.getHandshakeStatus();
- boolean processingData = true;
-
- // Start the Handshake if we aren't already processing a HandShake
- // and switch to the SECURING state
- if (!handshaking) {
- engine.beginHandshake();
- handshaking = true;
- session.changeState(SessionState.SECURING);
- }
-
- hsStatus = engine.getHandshakeStatus();
-
- // If the SSLEngine has not be started, then the status will be
NOT_HANDSHAKING
- while ((hsStatus != HandshakeStatus.FINISHED) && (hsStatus !=
HandshakeStatus.NOT_HANDSHAKING)
- && processingData) {
- switch (hsStatus) {
- case NEED_TASK:
- hsStatus = sslHelper.processTasks(engine);
-
- break;
-
- case NEED_WRAP:
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} processing the NEED_WRAP state",
session);
- }
-
- int capacity = engine.getSession().getPacketBufferSize();
- ByteBuffer outBuffer = ByteBuffer.allocate(capacity);
- SSLEngineResult result = null;
-
- while (true) {
- result = engine.wrap(EMPTY_BUFFER, outBuffer);
-
- if (result.getStatus() ==
SSLEngineResult.Status.BUFFER_OVERFLOW) {
- // TODO : increase the AppBuffer size
- } else {
- break;
- }
- }
-
- outBuffer.flip();
- session.write(outBuffer);
- hsStatus = result.getHandshakeStatus();
-
- // We continue to loop while we don't expect messages to
unwrap,
- // otherwise, we have to exit the loop.
- processingData = (hsStatus != HandshakeStatus.NEED_UNWRAP);
-
- break;
-
- case NEED_UNWRAP:
- Status status = sslHelper.processUnwrap(engine, inBuffer,
EMPTY_BUFFER);
-
- if (status == Status.BUFFER_UNDERFLOW) {
- // Read more data
- processingData = false;
- } else {
- hsStatus = engine.getHandshakeStatus();
- }
-
- break;
- }
- }
-
- if (hsStatus == HandshakeStatus.FINISHED) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} processing the FINISHED state", session);
- }
-
- session.changeState(SessionState.SECURED);
- handshaking = false;
-
- return true;
- }
-
- return false;
- }
-
/**
* Processes the Write action for the given SelectionKey
*/
@@ -616,7 +498,7 @@ public class NioSelectorProcessor implem
}
ByteBuffer buf = (ByteBuffer) wreq.getMessage();
-
+
int wrote = session.getSocketChannel().write(buf);
if (LOGGER.isDebugEnabled()) {
@@ -674,7 +556,7 @@ public class NioSelectorProcessor implem
/**
* Flushes the sessions
*/
- private void processFushSessions() throws IOException {
+ private void processFlushSessions() throws IOException {
NioTcpSession session = flushingSessions.poll();
// a key registered for read ? (because we can have a
// Selector for reads and another for the writes