Hi again,

I've gone ahead with implementing suspend/read for nio transport. Please
see the attached diff for my proposal on how traffic control could be
implemented. This is just a patch for SocketIoProcessor which adds the
ability to apply a mask to the interestOps of a SocketSessionImpl.

Beware, I haven't tested it at all yet, it's only a proposal and maybe
it doesn't work at all. I would like to have your feedback. Maybe you
have a better idea for how to implement this? There might also be some
fundamental things in mina which I havent understood correctly. Please
let me know.

One thing I'm not sure of is what happens to the readyOps if you change
the interestOps during the same select-iteration? Will isReadable be
false if I first mask out OP_READ in the interestOps right before
processSessions() is called? If this isn't the case I guess my patch
won't work without some changes.

BTW, is this the preferred way of submitting patches or should I somehow
attach it to the JIRA issue instead?

Regards,
Niklas Therning

Index: src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java	(revision 326826)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java	(working copy)
@@ -72,6 +72,8 @@
 
     private final Queue readableSessions = new Queue();
 
+    private final Queue interestOpsUpdatingSessions = new Queue();
+    
     private Worker worker;
 
     private long lastIdleCheckTime = System.currentTimeMillis();
@@ -131,6 +133,12 @@
         selector.wakeup();
     }
 
+    void updateInterestOps( SocketSessionImpl session )
+    {
+        scheduleInterestOpsUpdate(session );
+        selector.wakeup();
+    }
+    
     private void addSessions()
     {
         if( newSessions.isEmpty() )
@@ -224,6 +232,51 @@
         }
     }
 
+    private void updateInterestOpsSessions() 
+    {
+        if( interestOpsUpdatingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SocketSessionImpl session;
+
+            synchronized( interestOpsUpdatingSessions )
+            {
+                session = ( SocketSessionImpl ) interestOpsUpdatingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SocketChannel ch = session.getChannel();
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleInterestOpsUpdate( session );
+                break;
+            }
+            
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, OP_WRITE
+            int ops = SelectionKey.OP_READ;
+            Queue writeRequestQueue = session.getWriteRequestQueue();
+            synchronized( writeRequestQueue )
+            {
+                if( ! writeRequestQueue.isEmpty() ) {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+            
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getInterestOpsMask();
+            key.interestOps( ops & mask );
+        }
+    }
+    
     private void processSessions( Set selectedKeys )
     {
         Iterator it = selectedKeys.iterator();
@@ -315,6 +368,14 @@
         }
     }
 
+    private void scheduleInterestOpsUpdate( SocketSessionImpl session )
+    {
+        synchronized( interestOpsUpdatingSessions )
+        {
+            interestOpsUpdatingSessions.push( session );
+        }
+    }
+    
     private void notifyIdleSessions()
     {
         Set keys = selector.keys();
@@ -539,6 +600,7 @@
                 {
                     int nKeys = selector.select( 1000 );
                     addSessions();
+                    updateInterestOpsSessions();
 
                     if( nKeys > 0 )
                     {
Index: src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java	(revision 326826)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java	(working copy)
@@ -56,6 +56,7 @@
     private SelectionKey key;
     private CloseFuture closeFuture = new CloseFuture();
     private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+    private int interestOpsMask = 0xffffffff;
 
     /**
      * Creates a new instance.
@@ -87,6 +88,16 @@
         return ch;
     }
 
+    int getInterestOpsMask()
+    {
+        return interestOpsMask;
+    }
+
+    void setInterestOpsMask(int interestOpsMask) 
+    {
+        this.interestOpsMask = interestOpsMask;
+    }
+
     SelectionKey getSelectionKey()
     {
         return key;

Reply via email to