Hi,

I have attached a diff against the current trunk which contains my shot
at implementing traffic control. I have concentrated on TCP connections
so the suspend/resume/Read/Write methods have been added to
SocketSession for now. Please let me know what you think of the code and
if you have any idea on how to test this. I have tested
suspending/resuming read operations in our application and it seems to
be working just fine. But I would really like to write a TestCase
testing suspending/resuming reads and writes.

/Niklas

Index: src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java	(revision 329884)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java	(working copy)
@@ -75,6 +75,8 @@
 
     private final Queue readableSessions = new Queue();
 
+    private final Queue interestOpsUpdatingSessions = new Queue();
+    
     private Worker worker;
 
     private long lastIdleCheckTime = System.currentTimeMillis();
@@ -134,6 +136,12 @@
         selector.wakeup();
     }
 
+    void updateInterestOps( SocketSessionImpl session )
+    {
+        scheduleInterestOpsUpdate( session );
+        selector.wakeup();
+    }
+    
     private void addSessions()
     {
         if( newSessions.isEmpty() )
@@ -227,6 +235,61 @@
         }
     }
 
+    private void updateInterestOpsSessions() 
+    {
+        if( interestOpsUpdatingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SocketSessionImpl session;
+
+            synchronized( interestOpsUpdatingSessions )
+            {
+                session = ( SocketSessionImpl ) interestOpsUpdatingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SocketChannel ch = session.getChannel();
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleInterestOpsUpdate( session );
+                break;
+            }
+            
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, OP_WRITE
+            int ops = SelectionKey.OP_READ;
+            Queue writeRequestQueue = session.getWriteRequestQueue();
+            synchronized( writeRequestQueue )
+            {
+                if( ! writeRequestQueue.isEmpty() )
+                {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+            
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getInterestOpsMask();
+            
+            try
+            {
+                key.interestOps( ops & mask );
+            }
+            catch( CancelledKeyException e )
+            {
+                // Connection is closed unexpectedly.
+                scheduleRemove( session );
+            }
+        }
+    }
+    
     private void processSessions( Set selectedKeys )
     {
         Iterator it = selectedKeys.iterator();
@@ -236,12 +299,13 @@
             SelectionKey key = ( SelectionKey ) it.next();
             SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
 
-            if( key.isReadable() )
+            int mask = session.getInterestOpsMask();
+            if( key.isReadable() && ( mask & SelectionKey.OP_READ ) > 0 )
             {
                 read( session );
             }
 
-            if( key.isWritable() )
+            if( key.isWritable() && ( mask & SelectionKey.OP_WRITE ) > 0 )
             {
                 scheduleFlush( session );
             }
@@ -316,6 +380,14 @@
         }
     }
 
+    private void scheduleInterestOpsUpdate( SocketSessionImpl session )
+    {
+        synchronized( interestOpsUpdatingSessions )
+        {
+            interestOpsUpdatingSessions.push( session );
+        }
+    }
+    
     private void notifyIdleSessions()
     {
         // process idle sessions
@@ -524,6 +596,7 @@
                 {
                     int nKeys = selector.select( 1000 );
                     addSessions();
+                    updateInterestOpsSessions();
 
                     if( nKeys > 0 )
                     {
Index: src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java	(revision 329884)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java	(working copy)
@@ -56,6 +56,7 @@
     private SelectionKey key;
     private CloseFuture closeFuture = new CloseFuture();
     private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+    private int interestOpsMask = 0xffffffff;
 
     /**
      * Creates a new instance.
@@ -87,6 +88,48 @@
         return ch;
     }
 
+    int getInterestOpsMask()
+    {
+        return interestOpsMask;
+    }
+
+    void setInterestOpsMask( int interestOpsMask ) 
+    {
+        this.interestOpsMask = interestOpsMask;
+    }
+
+    public void resumeRead() {
+        if( (interestOpsMask & SelectionKey.OP_READ) == 0 )
+        {
+            interestOpsMask |= SelectionKey.OP_READ;
+            SocketIoProcessor.getInstance().updateInterestOps( this );
+        }
+    }
+
+    public void suspendRead() {
+        if( (interestOpsMask & SelectionKey.OP_READ) > 0 )
+        {
+            interestOpsMask &= ~SelectionKey.OP_READ;
+            SocketIoProcessor.getInstance().updateInterestOps( this );
+        }
+    }
+
+    public void resumeWrite() {
+        if( (interestOpsMask & SelectionKey.OP_WRITE) == 0 )
+        {
+            interestOpsMask |= SelectionKey.OP_WRITE;
+            SocketIoProcessor.getInstance().updateInterestOps( this );
+        }
+    }
+
+    public void suspendWrite() {
+        if( (interestOpsMask & SelectionKey.OP_WRITE) > 0 ) 
+        {
+            interestOpsMask &= ~SelectionKey.OP_WRITE;
+            SocketIoProcessor.getInstance().updateInterestOps( this );
+        }
+    }
+    
     SelectionKey getSelectionKey()
     {
         return key;
Index: src/java/org/apache/mina/transport/socket/nio/SocketSession.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/SocketSession.java	(revision 329884)
+++ src/java/org/apache/mina/transport/socket/nio/SocketSession.java	(working copy)
@@ -48,4 +48,8 @@
     void setReceiveBufferSize( int size ) throws SocketException;
     int getSessionReceiveBufferSize();
     void setSessionReceiveBufferSize( int size );
+    void suspendRead();
+    void resumeRead();
+    void suspendWrite();
+    void resumeWrite();
 }

Reply via email to