Author: elecharny
Date: Tue Dec  6 12:41:01 2011
New Revision: 1210888

URL: http://svn.apache.org/viewvc?rev=1210888&view=rev
Log:
o Added the logic to protect the writeQueue against concurrent modifications 
while writing the messages
o Added some code to handle the future SSL processing (not yet implemented)
o Renamed the session.getWriteQueue() to session.acquireWriteQueue()
o Added a session.releaseWriteQueue() method

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1210888&r1=1210887&r2=1210888&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java Tue Dec  6 
12:41:01 2011
@@ -300,11 +300,28 @@ public interface IoSession {
     public WriteRequest enqueueWriteRequest(Object message);
 
     /**
-     * Get the {@link Queue} of this session. The write queue contains the 
pending writes.
+     * Get the {@link Queue} of this session. The write queue contains the 
pending writes. This
+     * method will lock the WriteQueue using the WriteLock lock. The {@link 
releaseWriteQueue()
+     * method must be called when finished : <br/>
+     * <code>
+     *   try {
+     *       Queue<WriteRequest> queue = session.acquireWriteQueue();
+     *       ...
+     *       // We use the queue here
+     *       ...
+     *   } finally {
+     *       session.releaseWriteQueue();
+     *   }
+     * <code>
      * 
      * @return the write queue of this session
      */
-    public Queue<WriteRequest> getWriteQueue();
+    public Queue<WriteRequest> acquireWriteQueue();
+    
+    /**
+     * Release the WriteQueue after having acquired it with the {@link 
acquireWriteQeuee()} method.
+     */
+    public void releaseWriteQueue();
 
     /**
      * Get the filter chain in charge of filtering events generated by this 
session.

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1210888&r1=1210887&r2=1210888&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java 
Tue Dec  6 12:41:01 2011
@@ -25,6 +25,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoService;
@@ -87,6 +90,15 @@ public abstract class AbstractIoSession 
     /** the queue of pending writes for the session, to be dequeued by the 
{@link SelectorProcessor} */
     private Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
 
+    /** A lock to protect the access to the write queue */
+    private final ReadWriteLock writeQueueLock = new ReentrantReadWriteLock();
+    
+    /** A Read lock on the reentrant writeQueue lock */
+    private final Lock writeQueueReadLock = writeQueueLock.readLock();
+
+    /** A Write lock on the reentrant writeQueue lock */
+    private final Lock writeQueueWriteLock = writeQueueLock.writeLock();
+
     private IoFilterController filterProcessor;
     
     /**
@@ -314,7 +326,13 @@ public abstract class AbstractIoSession 
      */
     public WriteRequest enqueueWriteRequest(Object message) {
         DefaultWriteRequest request = new DefaultWriteRequest(message);
-        writeQueue.add(request);
+        
+        try {
+            writeQueueReadLock.lock();
+            writeQueue.add(request);
+        } finally {
+            writeQueueReadLock.unlock();
+        }
 
         // If it wasn't, we register this session as interested to write.
         // It's done in atomic fashion for avoiding two concurrent registering.
@@ -333,9 +351,18 @@ public abstract class AbstractIoSession 
      * {@inheritDoc}
      */
     @Override
-    public Queue<WriteRequest> getWriteQueue() {
+    public Queue<WriteRequest> acquireWriteQueue() {
+        writeQueueWriteLock.lock();
         return writeQueue;
     }
+    
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void releaseWriteQueue() {
+        writeQueueWriteLock.unlock();
+    }
 
     /**
      * {@inheritDoc}

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1210888&r1=1210887&r2=1210888&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
 Tue Dec  6 12:41:01 2011
@@ -386,7 +386,8 @@ public class NioSelectorProcessor implem
                                     readBuffer.flip();
                                     
                                     if (session.isSecured() && 
!session.isConnectedSecured()) {
-                                            // Process the SSL handshake now
+                                        // Process the SSL handshake now
+                                        //processHandShake(session, 
readBuffer);
                                     } else {
                                         
session.getFilterChain().processMessageReceived(session, readBuffer);
                                     }
@@ -399,46 +400,59 @@ public class NioSelectorProcessor implem
                                 }
                                 NioTcpSession session = (NioTcpSession) 
key.attachment();
                                 session.setNotRegisteredForWrite();
-                                // write from the session write queue
-                                Queue<WriteRequest> queue = 
session.getWriteQueue();
-
-                                do {
-                                    // get a write request from the queue
-                                    WriteRequest wreq = queue.peek();
-                                    if (wreq == null) {
-                                        break;
-                                    }
-                                    ByteBuffer buf = (ByteBuffer) 
wreq.getMessage();
-
-                                    int wrote = 
session.getSocketChannel().write(buf);
-                                    if (LOGGER.isDebugEnabled()) {
-                                        LOGGER.debug("wrote {} bytes to {}", 
wrote, session);
-                                    }
 
-                                    if (buf.remaining() == 0) {
-                                        // completed write request, let's 
remove
-                                        // it
-                                        queue.remove();
-                                        // complete the future
-                                        DefaultWriteFuture future = 
(DefaultWriteFuture) wreq.getFuture();
-                                        if (future != null) {
-                                            future.complete();
+                                // write from the session write queue
+                                boolean isEmpty = false;
+                                
+                                try {
+                                    Queue<WriteRequest> queue = 
session.acquireWriteQueue();
+    
+                                    do {
+                                        // get a write request from the queue
+                                        WriteRequest wreq = queue.peek();
+                                        
+                                        if (wreq == null) {
+                                            break;
                                         }
-                                    } else {
-                                        // output socket buffer is full, we 
need
-                                        // to give up until next selection for
-                                        // writing
-                                        break;
-                                    }
-
-                                } while (!queue.isEmpty());
+                                        
+                                        ByteBuffer buf = (ByteBuffer) 
wreq.getMessage();
+    
+                                        int wrote = 
session.getSocketChannel().write(buf);
+                                        
+                                        if (LOGGER.isDebugEnabled()) {
+                                            LOGGER.debug("wrote {} bytes to 
{}", wrote, session);
+                                        }
+    
+                                        if (buf.remaining() == 0) {
+                                            // completed write request, let's 
remove
+                                            // it
+                                            queue.remove();
+                                            // complete the future
+                                            DefaultWriteFuture future = 
(DefaultWriteFuture) wreq.getFuture();
+                                            
+                                            if (future != null) {
+                                                future.complete();
+                                            }
+                                        } else {
+                                            // output socket buffer is full, 
we need
+                                            // to give up until next selection 
for
+                                            // writing
+                                            break;
+                                        }
+                                    } while (!queue.isEmpty());
+                                    
+                                    isEmpty = queue.isEmpty();
+                                } finally {
+                                    session.releaseWriteQueue();
+                                }
 
                                 // if the session is no more interested in 
writing, we need
                                 // to stop listening for OP_WRITE events
-                                if (queue.isEmpty()) {
+                                if (isEmpty) {
                                     // a key registered for read ? (because we 
can have a
                                     // Selector for reads and another for the 
writes
                                     SelectionKey readKey = 
sessionReadKey.get(session);
+                                    
                                     if (readKey != null) {
                                         LOGGER.debug("registering key for only 
reading");
                                         SelectionKey mykey = 
session.getSocketChannel().register(selector,
@@ -449,7 +463,6 @@ public class NioSelectorProcessor implem
                                         
session.getSocketChannel().keyFor(selector).cancel();
                                     }
                                 }
-
                             }
 
                             if (key.isAcceptable()) {


Reply via email to