Author: proyal
Date: Sun Sep  2 09:54:52 2007
New Revision: 572031

URL: http://svn.apache.org/viewvc?rev=572031&view=rev
Log:
DIRMINA-429 and DIRMINA-430 - The SocketSessionImpl keeps track of whether or 
not it is in the flush queue, removing the need to check the size of its write 
request queue. Also have the SocketSessionImpl keep track of the # of 
outstanding bytes-to-be-written in an AtomicInteger.

Modified:
    
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
    
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java

Modified: 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=572031&r1=572030&r2=572031&view=diff
==============================================================================
--- 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
 (original)
+++ 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
 Sun Sep  2 09:54:52 2007
@@ -46,14 +46,14 @@
 
         // SocketIoProcessor.doFlush() will reset it after write is finished
         // because the buffer will be passed with messageSent event.
-        ((ByteBuffer) writeRequest.getMessage()).mark();
-        synchronized (writeRequestQueue) {
-            writeRequestQueue.add(writeRequest);
-            if (writeRequestQueue.size() == 1
-                    && session.getTrafficMask().isWritable()) {
-                // Notify SocketIoProcessor only when writeRequestQueue was 
empty.
-                s.getIoProcessor().flush(s);
-            }
+        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+        buffer.mark();
+
+        s.getScheduledWriteBytesCounter().addAndGet(buffer.remaining());
+        writeRequestQueue.add(writeRequest);
+
+        if (session.getTrafficMask().isWritable()) {
+            s.getIoProcessor().flush(s);
         }
     }
 

Modified: 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=572031&r1=572030&r2=572031&view=diff
==============================================================================
--- 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
 (original)
+++ 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
 Sun Sep  2 09:54:52 2007
@@ -89,10 +89,11 @@
     }
 
     void flush(SocketSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = this.selector;
-        if (selector != null) {
-            selector.wakeup();
+        if ( scheduleFlush(session) ) {
+            Selector selector = this.selector;
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
@@ -108,8 +109,14 @@
         removingSessions.add(session);
     }
 
-    private void scheduleFlush(SocketSessionImpl session) {
-        flushingSessions.add(session);
+    private boolean scheduleFlush(SocketSessionImpl session) {
+        if ( session.getInFlushQueue().compareAndSet( false, true ) ) {
+            flushingSessions.add(session);
+
+            return true;
+        }
+
+        return false;
     }
 
     private void scheduleTrafficControl(SocketSessionImpl session) {
@@ -210,7 +217,7 @@
             if (readBytes > 0) {
                 session.getFilterChain().fireMessageReceived(session, buf);
                 buf = null;
-                
+
                 if (readBytes * 2 < session.getReadBufferSize()) {
                     if (session.getReadBufferSize() > 64) {
                         session.setReadBufferSize(session.getReadBufferSize() 
>>> 1);
@@ -299,6 +306,8 @@
             if (session == null)
                 break;
 
+            session.getInFlushQueue().set( false );
+
             if (!session.isConnected()) {
                 releaseWriteBuffers(session);
                 continue;
@@ -318,7 +327,10 @@
             }
 
             try {
-                doFlush(session);
+                boolean flushedAll = doFlush(session);
+                if( flushedAll && !session.getWriteRequestQueue().isEmpty() && 
!session.getInFlushQueue().get()) {
+                    scheduleFlush( session );
+                }
             } catch (IOException e) {
                 scheduleRemove(session);
                 session.getFilterChain().fireExceptionCaught(session, e);
@@ -329,11 +341,12 @@
     private void releaseWriteBuffers(SocketSessionImpl session) {
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
-        
-        if ((req = writeRequestQueue.poll()) != null) {
+
+        while ((req = writeRequestQueue.poll()) != null) {
             ByteBuffer buf = (ByteBuffer) req.getMessage();
             try {
                 buf.release();
+                session.getScheduledWriteBytesCounter().addAndGet( 
-buf.remaining() );
             } catch (IllegalStateException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             } finally {
@@ -342,7 +355,7 @@
                 if (buf.hasRemaining()) {
                     req.getFuture().setWritten(false);
                 } else {
-                    session.getFilterChain().fireMessageSent(session, req);    
                
+                    session.getFilterChain().fireMessageSent(session, req);
                 }
             }
 
@@ -359,7 +372,7 @@
         }
     }
 
-    private void doFlush(SocketSessionImpl session) throws IOException {
+    private boolean doFlush(SocketSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -372,34 +385,36 @@
         try {
             for (;;) {
                 WriteRequest req = writeRequestQueue.peek();
-    
+
                 if (req == null)
                     break;
-    
+
                 ByteBuffer buf = (ByteBuffer) req.getMessage();
                 if (buf.remaining() == 0) {
                     writeRequestQueue.poll();
-    
+
                     session.increaseWrittenMessages();
-    
+
                     buf.reset();
                     session.getFilterChain().fireMessageSent(session, req);
                     continue;
                 }
-    
+
                 if (key.isWritable()) {
                     writtenBytes += ch.write(buf.buf());
                 }
-    
+
                 if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much.
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 }
             }
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+
+        return true;
     }
 
     private void doUpdateTrafficMask() {

Modified: 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=572031&r1=572030&r2=572031&view=diff
==============================================================================
--- 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
 (original)
+++ 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
 Sun Sep  2 09:54:52 2007
@@ -25,6 +25,8 @@
 import java.nio.channels.SocketChannel;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
@@ -71,6 +73,10 @@
 
     private final IoServiceListenerSupport serviceListeners;
 
+    private final AtomicBoolean inFlushQueue = new AtomicBoolean( false );
+
+    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+
     private SelectionKey key;
 
     private int readBufferSize = 1024;
@@ -184,13 +190,18 @@
      * @throws ClassCastException if an element is not a [EMAIL PROTECTED] 
ByteBuffer}
      */
     public int getScheduledWriteBytes() {
-        int byteSize = 0;
+        return scheduledWriteBytes.get();
+    }
 
-        for (WriteRequest request : writeRequestQueue) {
-            byteSize += ((ByteBuffer) request.getMessage()).remaining();
-        }
+    @Override
+    public void increaseWrittenBytes( int increment ) {
+        super.increaseWrittenBytes( increment );
 
-        return byteSize;
+        scheduledWriteBytes.addAndGet( -increment );
+    }
+
+    AtomicInteger getScheduledWriteBytesCounter() {
+        return scheduledWriteBytes;
     }
 
     @Override
@@ -222,9 +233,13 @@
     int getReadBufferSize() {
         return readBufferSize;
     }
-    
+
     void setReadBufferSize(int readBufferSize) {
         this.readBufferSize = readBufferSize;
+    }
+
+    AtomicBoolean getInFlushQueue() {
+        return inFlushQueue;
     }
 
     private class SessionConfigImpl extends BaseIoSessionConfig implements


Reply via email to