Hi again,
I've gone ahead with implementing suspend/read for nio transport. Please
see the attached diff for my proposal on how traffic control could be
implemented. This is just a patch for SocketIoProcessor which adds the
ability to apply a mask to the interestOps of a SocketSessionImpl.
Beware, I haven't tested it at all yet, it's only a proposal and maybe
it doesn't work at all. I would like to have your feedback. Maybe you
have a better idea for how to implement this? There might also be some
fundamental things in mina which I havent understood correctly. Please
let me know.
One thing I'm not sure of is what happens to the readyOps if you change
the interestOps during the same select-iteration? Will isReadable be
false if I first mask out OP_READ in the interestOps right before
processSessions() is called? If this isn't the case I guess my patch
won't work without some changes.
BTW, is this the preferred way of submitting patches or should I somehow
attach it to the JIRA issue instead?
Regards,
Niklas Therning
Index: src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (revision 326826)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (working copy)
@@ -72,6 +72,8 @@
private final Queue readableSessions = new Queue();
+ private final Queue interestOpsUpdatingSessions = new Queue();
+
private Worker worker;
private long lastIdleCheckTime = System.currentTimeMillis();
@@ -131,6 +133,12 @@
selector.wakeup();
}
+ void updateInterestOps( SocketSessionImpl session )
+ {
+ scheduleInterestOpsUpdate(session );
+ selector.wakeup();
+ }
+
private void addSessions()
{
if( newSessions.isEmpty() )
@@ -224,6 +232,51 @@
}
}
+ private void updateInterestOpsSessions()
+ {
+ if( interestOpsUpdatingSessions.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SocketSessionImpl session;
+
+ synchronized( interestOpsUpdatingSessions )
+ {
+ session = ( SocketSessionImpl ) interestOpsUpdatingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleInterestOpsUpdate( session );
+ break;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, OP_WRITE
+ int ops = SelectionKey.OP_READ;
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized( writeRequestQueue )
+ {
+ if( ! writeRequestQueue.isEmpty() ) {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ }
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getInterestOpsMask();
+ key.interestOps( ops & mask );
+ }
+ }
+
private void processSessions( Set selectedKeys )
{
Iterator it = selectedKeys.iterator();
@@ -315,6 +368,14 @@
}
}
+ private void scheduleInterestOpsUpdate( SocketSessionImpl session )
+ {
+ synchronized( interestOpsUpdatingSessions )
+ {
+ interestOpsUpdatingSessions.push( session );
+ }
+ }
+
private void notifyIdleSessions()
{
Set keys = selector.keys();
@@ -539,6 +600,7 @@
{
int nKeys = selector.select( 1000 );
addSessions();
+ updateInterestOpsSessions();
if( nKeys > 0 )
{
Index: src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (revision 326826)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (working copy)
@@ -56,6 +56,7 @@
private SelectionKey key;
private CloseFuture closeFuture = new CloseFuture();
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+ private int interestOpsMask = 0xffffffff;
/**
* Creates a new instance.
@@ -87,6 +88,16 @@
return ch;
}
+ int getInterestOpsMask()
+ {
+ return interestOpsMask;
+ }
+
+ void setInterestOpsMask(int interestOpsMask)
+ {
+ this.interestOpsMask = interestOpsMask;
+ }
+
SelectionKey getSelectionKey()
{
return key;