Author: trustin Date: Sat Dec 18 18:11:24 2004 New Revision: 122729 URL: http://svn.apache.org/viewcvs?view=rev&rev=122729 Log: Added filter chaining feature to I/O layer. :)
* Added: IoHandlerFilterManager which manages filter chain and fires events * Added: (Acceptor|Connector).(addFilter|removeFilter) method Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java (contents, props changed) Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Acceptor.java Sat Dec 18 18:11:24 2004 @@ -34,4 +34,8 @@ throws IOException; void unbind( SocketAddress address ); + + void addFilter(int priority, IoHandlerFilter filter); + + void removeFilter(IoHandlerFilter filter); } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java Sat Dec 18 18:11:24 2004 @@ -35,4 +35,8 @@ void connect( SocketAddress address, int timeout, IoHandler defaultHandler ) throws IOException; + + void addFilter(int priority, IoHandlerFilter filter); + + void removeFilter(IoHandlerFilter filter); } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoSession.java Sat Dec 18 18:11:24 2004 @@ -31,6 +31,8 @@ */ public interface IoSession { + IoHandler getHandler(); + void close(); Object getAttachment(); Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java Sat Dec 18 18:11:24 2004 @@ -34,6 +34,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.mina.io.Acceptor; import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoHandlerFilter; +import org.apache.mina.util.IoHandlerFilterManager; /** * TODO Insert type comment. @@ -47,6 +49,8 @@ private static final Log log = LogFactory.getLog( TcpAcceptor.class ); + private final IoHandlerFilterManager filterManager = new IoHandlerFilterManager(); + private final int id = nextId++; private final Selector selector; @@ -101,7 +105,7 @@ Validate.notNull( address ); ServerSocketChannel ssc = ( ServerSocketChannel ) channels - .get( address ); + .get( address ); if( ssc == null ) throw new IllegalArgumentException( "Unknown address: " + address ); @@ -149,16 +153,17 @@ continue; ServerSocketChannel ssc = ( ServerSocketChannel ) key - .channel(); + .channel(); SocketChannel ch = ssc.accept(); if( ch == null ) continue; TcpSession session = new TcpSession( + filterManager, ch, ( IoHandler ) key - .attachment() ); + .attachment() ); TcpIoProcessor.getInstance().addSession( session ); } } @@ -168,5 +173,15 @@ } } } + } + + public void addFilter( int priority, IoHandlerFilter filter ) + { + filterManager.addFilter( priority, filter ); + } + + public void removeFilter( IoHandlerFilter filter ) + { + filterManager.removeFilter( filter ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java Sat Dec 18 18:11:24 2004 @@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.mina.io.Connector; import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoHandlerFilter; +import org.apache.mina.util.IoHandlerFilterManager; /** * TODO Insert type comment. TODO Stop worker thread when not used. @@ -48,6 +50,8 @@ private final int id = nextId++; + private final IoHandlerFilterManager filterManager = new IoHandlerFilterManager(); + private final Selector selector; private Worker worker; @@ -200,7 +204,7 @@ private void newSession( SocketChannel ch, IoHandler handler ) { - TcpSession session = new TcpSession( ch, handler ); + TcpSession session = new TcpSession( filterManager, ch, handler ); TcpIoProcessor.getInstance().addSession( session ); } @@ -247,5 +251,16 @@ this.deadline = System.currentTimeMillis() + timeout * 1000L; this.handler = handler; } + } + + public void addFilter( int priority, IoHandlerFilter filter ) + { + filterManager.addFilter( priority, filter ); + + } + + public void removeFilter( IoHandlerFilter filter ) + { + filterManager.removeFilter( filter ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Sat Dec 18 18:11:24 2004 @@ -159,12 +159,12 @@ catch( IOException e ) { registered = false; - fireExceptionCaught( session, e ); + session.getFilterManager().fireExceptionCaught( session, e ); } if( registered ) { - fireSessionOpened( session ); + session.getFilterManager().fireSessionOpened( session ); } } } @@ -196,11 +196,11 @@ } catch( IOException e ) { - fireExceptionCaught( session, e ); + session.getFilterManager().fireExceptionCaught( session, e ); } finally { - fireSessionClosed( session ); + session.getFilterManager().fireSessionClosed( session ); } } } @@ -263,7 +263,8 @@ if( readBytes > 0 ) { lock.markBaseIndex(); - fireDataRead( session, readBytes ); + session.getFilterManager().fireDataRead( session, + readBytes ); } else { @@ -280,7 +281,7 @@ } catch( Throwable e ) { - fireExceptionCaught( session, e ); + session.getFilterManager().fireExceptionCaught( session, e ); } } @@ -346,7 +347,7 @@ && ( currentTime - lastIoTime ) >= idleTime ) { session.setIdle( status ); - fireSessionIdle( session, status ); + session.getFilterManager().fireSessionIdle( session, status ); } } @@ -385,7 +386,7 @@ synchronized( lock ) { writeBuf.flip(); - + // ignore empty write buffer if( writeBuf.remaining() == 0 ) { @@ -399,7 +400,9 @@ if( marker == null ) break; - fireMarkerRemoved( session, marker.getValue() ); + session.getFilterManager() + .fireMarkerReleased( session, + marker.getValue() ); } } else @@ -434,7 +437,8 @@ { session.increaseWrittenBytes( writtenBytes ); lock.markBaseIndex(); - fireDataWritten( session, writtenBytes ); + session.getFilterManager() + .fireDataWritten( session, writtenBytes ); Queue markers = lock.getMarkers(); for( ;; ) { @@ -452,13 +456,19 @@ else if( bytesLeft == writtenBytes ) { markers.pop(); - fireMarkerRemoved( session, marker.getValue() ); + session + .getFilterManager() + .fireMarkerReleased( session, + marker.getValue() ); break; } else { markers.pop(); - fireMarkerRemoved( session, marker.getValue() ); + session + .getFilterManager() + .fireMarkerReleased( session, + marker.getValue() ); writtenBytes -= bytesLeft; } } @@ -468,96 +478,7 @@ } catch( IOException e ) { - fireExceptionCaught( session, e ); - } - } - - private void fireSessionOpened( TcpSession session ) - { - try - { - session.getHandler().sessionOpened( session ); - } - catch( Throwable e ) - { - fireExceptionCaught( session, e ); - } - } - - private void fireSessionClosed( TcpSession session ) - { - try - { - session.getHandler().sessionClosed( session ); - } - catch( Throwable e ) - { - fireExceptionCaught( session, e ); - } - } - - private void fireSessionIdle( TcpSession session, IdleStatus status ) - { - try - { - session.getHandler().sessionIdle( session, status ); - } - catch( Throwable e ) - { - fireExceptionCaught( session, e ); - } - } - - private void fireDataRead( TcpSession session, int readBytes ) - { - try - { - session.getHandler().dataRead( session, readBytes ); - } - catch( Throwable e ) - { - fireExceptionCaught( session, e ); - } - } - - private void fireDataWritten( TcpSession session, int writtenBytes ) - { - try - { - session.getHandler().dataWritten( session, writtenBytes ); - } - catch( Throwable e ) - { - fireExceptionCaught( session, e ); - } - } - - private void fireMarkerRemoved( TcpSession session, Object marker ) - { - try - { - session.getHandler().markerReleased( session, marker ); - } - catch( Throwable e ) - { - fireExceptionCaught( session, e ); - } - } - - private void fireExceptionCaught( TcpSession session, Throwable cause ) - { - try - { - session.getHandler().exceptionCaught( session, cause ); - - if( cause instanceof IOException ) - { - scheduleRemove( session ); - } - } - catch( Throwable t ) - { - log.error( "Exception from excaptionCaught.", t ); + session.getFilterManager().fireExceptionCaught( session, e ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java?view=diff&rev=122729&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java&r1=122728&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java&r2=122729 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java Sat Dec 18 18:11:24 2004 @@ -25,11 +25,12 @@ import org.apache.mina.core.IdleStatus; import org.apache.mina.core.SessionConfig; -import org.apache.mina.io.ReadBuffer; -import org.apache.mina.io.IoSession; import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoSession; +import org.apache.mina.io.ReadBuffer; import org.apache.mina.io.WriteBuffer; import org.apache.mina.util.ByteBufferPool; +import org.apache.mina.util.IoHandlerFilterManager; /** * TODO Insert type comment. @@ -39,6 +40,8 @@ */ class TcpSession implements IoSession { + private final IoHandlerFilterManager filterManager; + private final SocketChannel ch; private final TcpSessionConfig config; @@ -70,8 +73,9 @@ /** * Creates a new instance. */ - TcpSession( SocketChannel ch, IoHandler defaultHandler ) + TcpSession( IoHandlerFilterManager filterManager, SocketChannel ch, IoHandler defaultHandler ) { + this.filterManager = filterManager; this.ch = ch; this.config = new TcpSessionConfig( ch ); this.readBuf = new TcpReadBuffer( @@ -83,17 +87,16 @@ this.writeBuf = new TcpWriteBuffer( this, ByteBufferPool.open() ); this.handler = defaultHandler; } + + IoHandlerFilterManager getFilterManager() { + return filterManager; + } SocketChannel getChannel() { return ch; } - IoHandler getHandler() - { - return handler; - } - SelectionKey getSelectionKey() { return key; @@ -108,6 +111,11 @@ { ByteBufferPool.close( readBuf.buf() ); ByteBufferPool.close( writeBuf.buf() ); + } + + public IoHandler getHandler() + { + return handler; } public void close() Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java?view=auto&rev=122729 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Sat Dec 18 18:11:24 2004 @@ -0,0 +1,351 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.util; + +import org.apache.mina.core.IdleStatus; +import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoHandlerFilter; +import org.apache.mina.io.IoSession; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class IoHandlerFilterManager +{ + private static final IoHandlerFilter FINAL_FILTER = new IoHandlerFilter() + { + + public void sessionOpened( IoHandler nextHandler, IoSession session ) + { + session.getHandler().sessionOpened( session ); + } + + public void sessionClosed( IoHandler nextHandler, IoSession session ) + { + session.getHandler().sessionClosed( session ); + } + + public void sessionIdle( IoHandler nextHandler, IoSession session, + IdleStatus status ) + { + session.getHandler().sessionIdle( session, status ); + } + + public void exceptionCaught( IoHandler nextHandler, IoSession session, + Throwable cause ) + { + session.getHandler().exceptionCaught( session, cause ); + } + + public void dataRead( IoHandler nextHandler, IoSession session, + int readBytes ) + { + session.getHandler().dataRead( session, readBytes ); + } + + public void dataWritten( IoHandler nextHandler, IoSession session, + int writtenBytes ) + { + session.getHandler().dataWritten( session, writtenBytes ); + } + + public void markerReleased( IoHandler nextHandler, IoSession session, + Object marker ) + { + session.getHandler().markerReleased( session, marker ); + } + }; + + private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER ); + + public IoHandlerFilterManager() + { + } + + public synchronized void addFilter( int priority, IoHandlerFilter filter ) + { + Entry e = head; + Entry prevEntry = null; + for( ;; ) + { + if( e.nextEntry == null ) + { + Entry newEntry = new Entry( e, priority, filter ); + if( prevEntry == null ) + { + head = newEntry; + } + else + { + prevEntry.nextEntry = newEntry; + } + break; + } + else if( e.priority < priority ) + { + Entry newEntry = new Entry( e, priority, filter ); + if( prevEntry == null ) + { + head = newEntry; + } + else + { + prevEntry.nextEntry = newEntry; + } + break; + } + prevEntry = e; + e = e.nextEntry; + } + } + + public synchronized void removeFilter( IoHandlerFilter filter ) + { + Entry e = head; + Entry prevEntry = null; + for( ;; ) + { + if( e.nextEntry == null ) + { + break; + } + else if( e.filter == filter ) + { + if( prevEntry == null ) + { + // e is head + head = e.nextEntry; + } + else + { + prevEntry.nextEntry = e.nextEntry; + } + break; + } + prevEntry = e; + e = e.nextEntry; + } + } + + public void fireSessionOpened( IoSession session ) + { + Entry head = this.head; + try + { + head.filter.sessionOpened( head.nextHandler, session ); + } + catch( Throwable e ) + { + fireExceptionCaught( session, e ); + } + } + + public void fireSessionClosed( IoSession session ) + { + Entry head = this.head; + try + { + head.filter.sessionClosed( head.nextHandler, session ); + } + catch( Throwable e ) + { + fireExceptionCaught( session, e ); + } + } + + public void fireSessionIdle( IoSession session, IdleStatus status ) + { + Entry head = this.head; + try + { + head.filter.sessionIdle( head.nextHandler, session, status ); + } + catch( Throwable e ) + { + fireExceptionCaught( session, e ); + } + } + + public void fireDataRead( IoSession session, int readBytes ) + { + Entry head = this.head; + try + { + head.filter.dataRead( head.nextHandler, session, readBytes ); + } + catch( Throwable e ) + { + fireExceptionCaught( session, e ); + } + } + + public void fireDataWritten( IoSession session, int dataWritten ) + { + Entry head = this.head; + try + { + head.filter.dataWritten( head.nextHandler, session, dataWritten ); + } + catch( Throwable e ) + { + fireExceptionCaught( session, e ); + } + } + + public void fireMarkerReleased( IoSession session, Object marker ) + { + Entry head = this.head; + try + { + head.filter.markerReleased( head.nextHandler, session, marker ); + } + catch( Throwable e ) + { + fireExceptionCaught( session, e ); + } + } + + public void fireExceptionCaught( IoSession session, Throwable cause ) + { + Entry head = this.head; + try + { + head.filter.exceptionCaught( head.nextHandler, session, cause ); + } + catch( Throwable e ) + { + e.printStackTrace(); + } + } + + private static class Entry + { + private Entry nextEntry; + + private final int priority; + + private final IoHandlerFilter filter; + + private final IoHandler nextHandler; + + private Entry( Entry nextEntry, int priority, IoHandlerFilter filter ) + { + if( filter == null ) + throw new NullPointerException( "filter" ); + this.nextEntry = nextEntry; + this.priority = priority; + this.filter = filter; + this.nextHandler = new IoHandler() + { + + public void sessionOpened( IoSession session ) + { + try + { + Entry.this.nextEntry.filter + .sessionOpened( + Entry.this.nextEntry.nextHandler, + session ); + } + catch( Throwable e ) + { + exceptionCaught( session, e ); + } + } + + public void sessionClosed( IoSession session ) + { + try + { + Entry.this.nextEntry.filter + .sessionClosed( + Entry.this.nextEntry.nextHandler, + session ); + } + catch( Throwable e ) + { + exceptionCaught( session, e ); + } + } + + public void sessionIdle( IoSession session, IdleStatus status ) + { + try + { + Entry.this.nextEntry.filter + .sessionIdle( + Entry.this.nextEntry.nextHandler, + session, status ); + } + catch( Throwable e ) + { + exceptionCaught( session, e ); + } + } + + public void exceptionCaught( IoSession session, Throwable cause ) + { + try + { + Entry.this.nextEntry.filter + .exceptionCaught( + Entry.this.nextEntry.nextHandler, + session, cause ); + } + catch( Throwable e ) + { + e.printStackTrace(); + } + } + + public void dataRead( IoSession session, int readBytes ) + { + try + { + Entry.this.nextEntry.filter + .dataRead( Entry.this.nextEntry.nextHandler, + session, readBytes ); + } + catch( Throwable e ) + { + exceptionCaught( session, e ); + } + } + + public void dataWritten( IoSession session, int writtenBytes ) + { + try + { + Entry.this.nextEntry.filter + .dataWritten( + Entry.this.nextEntry.nextHandler, + session, writtenBytes ); + } + catch( Throwable e ) + { + exceptionCaught( session, e ); + } + } + + public void markerReleased( IoSession session, Object marker ) + { + try + { + Entry.this.nextEntry.filter + .markerReleased( + Entry.this.nextEntry.nextHandler, + session, marker ); + } + catch( Throwable e ) + { + exceptionCaught( session, e ); + } + } + }; + } + } +} \ No newline at end of file
