Hi,
I have attached a diff against the current trunk which contains my shot
at implementing traffic control. I have concentrated on TCP connections
so the suspend/resume/Read/Write methods have been added to
SocketSession for now. Please let me know what you think of the code and
if you have any idea on how to test this. I have tested
suspending/resuming read operations in our application and it seems to
be working just fine. But I would really like to write a TestCase
testing suspending/resuming reads and writes.
/Niklas
Index: src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (revision 329884)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (working copy)
@@ -75,6 +75,8 @@
private final Queue readableSessions = new Queue();
+ private final Queue interestOpsUpdatingSessions = new Queue();
+
private Worker worker;
private long lastIdleCheckTime = System.currentTimeMillis();
@@ -134,6 +136,12 @@
selector.wakeup();
}
+ void updateInterestOps( SocketSessionImpl session )
+ {
+ scheduleInterestOpsUpdate( session );
+ selector.wakeup();
+ }
+
private void addSessions()
{
if( newSessions.isEmpty() )
@@ -227,6 +235,61 @@
}
}
+ private void updateInterestOpsSessions()
+ {
+ if( interestOpsUpdatingSessions.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SocketSessionImpl session;
+
+ synchronized( interestOpsUpdatingSessions )
+ {
+ session = ( SocketSessionImpl ) interestOpsUpdatingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleInterestOpsUpdate( session );
+ break;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, OP_WRITE
+ int ops = SelectionKey.OP_READ;
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized( writeRequestQueue )
+ {
+ if( ! writeRequestQueue.isEmpty() )
+ {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ }
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getInterestOpsMask();
+
+ try
+ {
+ key.interestOps( ops & mask );
+ }
+ catch( CancelledKeyException e )
+ {
+ // Connection is closed unexpectedly.
+ scheduleRemove( session );
+ }
+ }
+ }
+
private void processSessions( Set selectedKeys )
{
Iterator it = selectedKeys.iterator();
@@ -236,12 +299,13 @@
SelectionKey key = ( SelectionKey ) it.next();
SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
- if( key.isReadable() )
+ int mask = session.getInterestOpsMask();
+ if( key.isReadable() && ( mask & SelectionKey.OP_READ ) > 0 )
{
read( session );
}
- if( key.isWritable() )
+ if( key.isWritable() && ( mask & SelectionKey.OP_WRITE ) > 0 )
{
scheduleFlush( session );
}
@@ -316,6 +380,14 @@
}
}
+ private void scheduleInterestOpsUpdate( SocketSessionImpl session )
+ {
+ synchronized( interestOpsUpdatingSessions )
+ {
+ interestOpsUpdatingSessions.push( session );
+ }
+ }
+
private void notifyIdleSessions()
{
// process idle sessions
@@ -524,6 +596,7 @@
{
int nKeys = selector.select( 1000 );
addSessions();
+ updateInterestOpsSessions();
if( nKeys > 0 )
{
Index: src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (revision 329884)
+++ src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (working copy)
@@ -56,6 +56,7 @@
private SelectionKey key;
private CloseFuture closeFuture = new CloseFuture();
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+ private int interestOpsMask = 0xffffffff;
/**
* Creates a new instance.
@@ -87,6 +88,48 @@
return ch;
}
+ int getInterestOpsMask()
+ {
+ return interestOpsMask;
+ }
+
+ void setInterestOpsMask( int interestOpsMask )
+ {
+ this.interestOpsMask = interestOpsMask;
+ }
+
+ public void resumeRead() {
+ if( (interestOpsMask & SelectionKey.OP_READ) == 0 )
+ {
+ interestOpsMask |= SelectionKey.OP_READ;
+ SocketIoProcessor.getInstance().updateInterestOps( this );
+ }
+ }
+
+ public void suspendRead() {
+ if( (interestOpsMask & SelectionKey.OP_READ) > 0 )
+ {
+ interestOpsMask &= ~SelectionKey.OP_READ;
+ SocketIoProcessor.getInstance().updateInterestOps( this );
+ }
+ }
+
+ public void resumeWrite() {
+ if( (interestOpsMask & SelectionKey.OP_WRITE) == 0 )
+ {
+ interestOpsMask |= SelectionKey.OP_WRITE;
+ SocketIoProcessor.getInstance().updateInterestOps( this );
+ }
+ }
+
+ public void suspendWrite() {
+ if( (interestOpsMask & SelectionKey.OP_WRITE) > 0 )
+ {
+ interestOpsMask &= ~SelectionKey.OP_WRITE;
+ SocketIoProcessor.getInstance().updateInterestOps( this );
+ }
+ }
+
SelectionKey getSelectionKey()
{
return key;
Index: src/java/org/apache/mina/transport/socket/nio/SocketSession.java
===================================================================
--- src/java/org/apache/mina/transport/socket/nio/SocketSession.java (revision 329884)
+++ src/java/org/apache/mina/transport/socket/nio/SocketSession.java (working copy)
@@ -48,4 +48,8 @@
void setReceiveBufferSize( int size ) throws SocketException;
int getSessionReceiveBufferSize();
void setSessionReceiveBufferSize( int size );
+ void suspendRead();
+ void resumeRead();
+ void suspendWrite();
+ void resumeWrite();
}