Author: elecharny
Date: Tue Dec 6 12:41:01 2011
New Revision: 1210888
URL: http://svn.apache.org/viewvc?rev=1210888&view=rev
Log:
o Added the logic to protect the writeQueue against concurrent modifications
while writing the messages
o Added some code to handle the future SSL processing (not yet implemented)
o Renamed the session.getWriteQueue() to session.acquireWriteQueue()
o Added a session.releaseWriteQueue() method
Modified:
mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1210888&r1=1210887&r2=1210888&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java Tue Dec 6
12:41:01 2011
@@ -300,11 +300,28 @@ public interface IoSession {
public WriteRequest enqueueWriteRequest(Object message);
/**
- * Get the {@link Queue} of this session. The write queue contains the
pending writes.
+ * Get the {@link Queue} of this session. The write queue contains the
pending writes. This
+ * method will lock the WriteQueue using the WriteLock lock. The {@link
releaseWriteQueue()
+ * method must be called when finished : <br/>
+ * <code>
+ * try {
+ * Queue<WriteRequest> queue = session.acquireWriteQueue();
+ * ...
+ * // We use the queue here
+ * ...
+ * } finally {
+ * session.releaseWriteQueue();
+ * }
+ * <code>
*
* @return the write queue of this session
*/
- public Queue<WriteRequest> getWriteQueue();
+ public Queue<WriteRequest> acquireWriteQueue();
+
+ /**
+ * Release the WriteQueue after having acquired it with the {@link
acquireWriteQeuee()} method.
+ */
+ public void releaseWriteQueue();
/**
* Get the filter chain in charge of filtering events generated by this
session.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1210888&r1=1210887&r2=1210888&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
Tue Dec 6 12:41:01 2011
@@ -25,6 +25,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
@@ -87,6 +90,15 @@ public abstract class AbstractIoSession
/** the queue of pending writes for the session, to be dequeued by the
{@link SelectorProcessor} */
private Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
+ /** A lock to protect the access to the write queue */
+ private final ReadWriteLock writeQueueLock = new ReentrantReadWriteLock();
+
+ /** A Read lock on the reentrant writeQueue lock */
+ private final Lock writeQueueReadLock = writeQueueLock.readLock();
+
+ /** A Write lock on the reentrant writeQueue lock */
+ private final Lock writeQueueWriteLock = writeQueueLock.writeLock();
+
private IoFilterController filterProcessor;
/**
@@ -314,7 +326,13 @@ public abstract class AbstractIoSession
*/
public WriteRequest enqueueWriteRequest(Object message) {
DefaultWriteRequest request = new DefaultWriteRequest(message);
- writeQueue.add(request);
+
+ try {
+ writeQueueReadLock.lock();
+ writeQueue.add(request);
+ } finally {
+ writeQueueReadLock.unlock();
+ }
// If it wasn't, we register this session as interested to write.
// It's done in atomic fashion for avoiding two concurrent registering.
@@ -333,9 +351,18 @@ public abstract class AbstractIoSession
* {@inheritDoc}
*/
@Override
- public Queue<WriteRequest> getWriteQueue() {
+ public Queue<WriteRequest> acquireWriteQueue() {
+ writeQueueWriteLock.lock();
return writeQueue;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void releaseWriteQueue() {
+ writeQueueWriteLock.unlock();
+ }
/**
* {@inheritDoc}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1210888&r1=1210887&r2=1210888&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Tue Dec 6 12:41:01 2011
@@ -386,7 +386,8 @@ public class NioSelectorProcessor implem
readBuffer.flip();
if (session.isSecured() &&
!session.isConnectedSecured()) {
- // Process the SSL handshake now
+ // Process the SSL handshake now
+ //processHandShake(session,
readBuffer);
} else {
session.getFilterChain().processMessageReceived(session, readBuffer);
}
@@ -399,46 +400,59 @@ public class NioSelectorProcessor implem
}
NioTcpSession session = (NioTcpSession)
key.attachment();
session.setNotRegisteredForWrite();
- // write from the session write queue
- Queue<WriteRequest> queue =
session.getWriteQueue();
-
- do {
- // get a write request from the queue
- WriteRequest wreq = queue.peek();
- if (wreq == null) {
- break;
- }
- ByteBuffer buf = (ByteBuffer)
wreq.getMessage();
-
- int wrote =
session.getSocketChannel().write(buf);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("wrote {} bytes to {}",
wrote, session);
- }
- if (buf.remaining() == 0) {
- // completed write request, let's
remove
- // it
- queue.remove();
- // complete the future
- DefaultWriteFuture future =
(DefaultWriteFuture) wreq.getFuture();
- if (future != null) {
- future.complete();
+ // write from the session write queue
+ boolean isEmpty = false;
+
+ try {
+ Queue<WriteRequest> queue =
session.acquireWriteQueue();
+
+ do {
+ // get a write request from the queue
+ WriteRequest wreq = queue.peek();
+
+ if (wreq == null) {
+ break;
}
- } else {
- // output socket buffer is full, we
need
- // to give up until next selection for
- // writing
- break;
- }
-
- } while (!queue.isEmpty());
+
+ ByteBuffer buf = (ByteBuffer)
wreq.getMessage();
+
+ int wrote =
session.getSocketChannel().write(buf);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("wrote {} bytes to
{}", wrote, session);
+ }
+
+ if (buf.remaining() == 0) {
+ // completed write request, let's
remove
+ // it
+ queue.remove();
+ // complete the future
+ DefaultWriteFuture future =
(DefaultWriteFuture) wreq.getFuture();
+
+ if (future != null) {
+ future.complete();
+ }
+ } else {
+ // output socket buffer is full,
we need
+ // to give up until next selection
for
+ // writing
+ break;
+ }
+ } while (!queue.isEmpty());
+
+ isEmpty = queue.isEmpty();
+ } finally {
+ session.releaseWriteQueue();
+ }
// if the session is no more interested in
writing, we need
// to stop listening for OP_WRITE events
- if (queue.isEmpty()) {
+ if (isEmpty) {
// a key registered for read ? (because we
can have a
// Selector for reads and another for the
writes
SelectionKey readKey =
sessionReadKey.get(session);
+
if (readKey != null) {
LOGGER.debug("registering key for only
reading");
SelectionKey mykey =
session.getSocketChannel().register(selector,
@@ -449,7 +463,6 @@ public class NioSelectorProcessor implem
session.getSocketChannel().keyFor(selector).cancel();
}
}
-
}
if (key.isAcceptable()) {