Author: trustin Date: Tue Jan 11 23:13:42 2005 New Revision: 124943 URL: http://svn.apache.org/viewcvs?view=rev&rev=124943 Log: * Session.write() operation is now filtered: * Added IoHandlerFilter.filterWrite(ByteBuffer) and ProtocolHandlerFiler.filterWrite(Object) * Added IoHandlerFilterManager.write() and ProtocolHandlerFilterManager.write() to filter write operations. * Prohibited wrapping IoSessions and ProtocolSessions in filters for the case user caches session instance. * Filter priorities must be unique now to avoid confusion.
Added: incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterImpl.java (contents, props changed) incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterManagerTest.java (contents, props changed) incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterImpl.java (contents, props changed) incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterManagerTest.java (contents, props changed) Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AbstractMessage.java Tue Jan 11 23:13:42 2005 @@ -14,89 +14,105 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public abstract class AbstractMessage implements Message { +public abstract class AbstractMessage implements Message +{ - private final int type; + private final int type; - private int sequence; + private int sequence; - private boolean readHeader; + private boolean readHeader; - private boolean wroteHeader; - - protected AbstractMessage(int type) { - this.type = type; - } - - public int getSequence() { - return sequence; - } - - public void setSequence(int sequence) { - this.sequence = sequence; - } - - public final boolean read(ByteBuffer buf) throws MessageParseException { - // read a header if not read yet. - if (!readHeader) { - readHeader = readHeader(buf); - if (!readHeader) - return false; - } - - // Header is read, now try to read body - if (readBody(buf)) { - // finished reading single complete message - readHeader = false; // reset state - return true; - } else - return false; - } - - private boolean readHeader(ByteBuffer buf) throws MessageParseException { - // if header is not fully read, don't read it. - if (buf.remaining() < Constants.HEADER_LEN) - return false; - - // read header and validate the message - int readType = buf.getShort(); - if (type != readType) - throw new MessageParseException("type mismatches: " + readType - + " (expected: " + type + ')'); - - // read sequence number of the message - sequence = buf.getInt(); - return true; - } - - protected abstract boolean readBody(ByteBuffer buf) - throws MessageParseException; - - public boolean write(ByteBuffer buf) { - // write a header if not written yet. - if (!wroteHeader) { - wroteHeader = writeHeader(buf); - if (!wroteHeader) - return false; // buffer is almost full perhaps - } - - // Header is written, now try to write body - if (writeBody(buf)) { - // finished writing single complete message - wroteHeader = false; - return true; - } else { - return false; - } - } - - private boolean writeHeader(ByteBuffer buf) { - // check if there is enough space to write header - if (buf.remaining() < Constants.HEADER_LEN) return false; - buf.putShort((short) type); - buf.putInt(sequence); - return true; - } + private boolean wroteHeader; + + protected AbstractMessage( int type ) + { + this.type = type; + } + + public int getSequence() + { + return sequence; + } + + public void setSequence( int sequence ) + { + this.sequence = sequence; + } + + public final boolean read( ByteBuffer buf ) throws MessageParseException + { + // read a header if not read yet. + if( !readHeader ) + { + readHeader = readHeader( buf ); + if( !readHeader ) + return false; + } + + // Header is read, now try to read body + if( readBody( buf ) ) + { + // finished reading single complete message + readHeader = false; // reset state + return true; + } + else + return false; + } + + private boolean readHeader( ByteBuffer buf ) throws MessageParseException + { + // if header is not fully read, don't read it. + if( buf.remaining() < Constants.HEADER_LEN ) + return false; + + // read header and validate the message + int readType = buf.getShort(); + if( type != readType ) + throw new MessageParseException( "type mismatches: " + readType + + " (expected: " + type + ')' ); + + // read sequence number of the message + sequence = buf.getInt(); + return true; + } + + protected abstract boolean readBody( ByteBuffer buf ) + throws MessageParseException; + + public boolean write( ByteBuffer buf ) + { + // write a header if not written yet. + if( !wroteHeader ) + { + wroteHeader = writeHeader( buf ); + if( !wroteHeader ) + return false; // buffer is almost full perhaps + } + + // Header is written, now try to write body + if( writeBody( buf ) ) + { + // finished writing single complete message + wroteHeader = false; + return true; + } + else + { + return false; + } + } + + private boolean writeHeader( ByteBuffer buf ) + { + // check if there is enough space to write header + if( buf.remaining() < Constants.HEADER_LEN ) + return false; + buf.putShort( ( short ) type ); + buf.putInt( sequence ); + return true; + } - protected abstract boolean writeBody(ByteBuffer buf); + protected abstract boolean writeBody( ByteBuffer buf ); } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/AddMessage.java Tue Jan 11 23:13:42 2005 @@ -13,41 +13,49 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class AddMessage extends AbstractMessage { - - private int value; - - protected AddMessage() { - super(Constants.ADD); - } - - public int getValue() { - return value; - } - - public void setValue(int value) { - this.value = value; - } - - protected boolean readBody(ByteBuffer buf) throws MessageParseException { - // don't read body if it is partially readable - if (buf.remaining() < Constants.ADD_BODY_LEN) return false; - value = buf.getInt(); - return true; - } - - protected boolean writeBody(ByteBuffer buf) { - // check if there is enough space to write body - if (buf.remaining() < Constants.ADD_BODY_LEN) - return false; - - buf.putInt(value); - - return true; - } - - public String toString() { - // it is a good practice to create toString() method on message classes. - return getSequence() + ":ADD(" + value + ')'; - } -} +public class AddMessage extends AbstractMessage +{ + + private int value; + + protected AddMessage() + { + super( Constants.ADD ); + } + + public int getValue() + { + return value; + } + + public void setValue( int value ) + { + this.value = value; + } + + protected boolean readBody( ByteBuffer buf ) throws MessageParseException + { + // don't read body if it is partially readable + if( buf.remaining() < Constants.ADD_BODY_LEN ) + return false; + value = buf.getInt(); + return true; + } + + protected boolean writeBody( ByteBuffer buf ) + { + // check if there is enough space to write body + if( buf.remaining() < Constants.ADD_BODY_LEN ) + return false; + + buf.putInt( value ); + + return true; + } + + public String toString() + { + // it is a good practice to create toString() method on message classes. + return getSequence() + ":ADD(" + value + ')'; + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Client.java Tue Jan 11 23:13:42 2005 @@ -19,22 +19,28 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class Client { - private static final String HOSTNAME = "localhost"; - private static final int PORT = 8080; - private static final int CONNECT_TIMEOUT = 30; // seconds - - public static void main(String[] args) throws Throwable { - if (args.length == 0) { - System.out.println("Please specify the list of any integers"); - return; - } - - // prepare values to sum up - int[] values = new int[args.length]; - for (int i = 0; i < args.length; i++) { - values[i] = Integer.parseInt(args[i]); - } +public class Client +{ + private static final String HOSTNAME = "localhost"; + + private static final int PORT = 8080; + + private static final int CONNECT_TIMEOUT = 30; // seconds + + public static void main( String[] args ) throws Throwable + { + if( args.length == 0 ) + { + System.out.println( "Please specify the list of any integers" ); + return; + } + + // prepare values to sum up + int[] values = new int[ args.length ]; + for( int i = 0; i < args.length; i++ ) + { + values[ i ] = Integer.parseInt( args[ i ] ); + } // Create I/O and Protocol thread pool filter. // I/O thread pool performs encoding and decoding of messages. @@ -46,31 +52,39 @@ ioThreadPoolFilter.start(); protocolThreadPoolFilter.start(); - Connector connector = new SocketConnector(); - IoAdapter adapter = new IoAdapter(); - - connector.addFilter(Integer.MAX_VALUE, ioThreadPoolFilter); - adapter.addFilter(Integer.MAX_VALUE, protocolThreadPoolFilter); - - ProtocolProvider protocolProvider = new ClientProtocolProvider(values); - for ( ;; ) { - try { - connector.connect(new InetSocketAddress(HOSTNAME, PORT), CONNECT_TIMEOUT, adapter.adapt(protocolProvider)); - break; - } catch (IOException e) { - System.err.println("Failed to connect."); - e.printStackTrace(); - Thread.sleep(5000); - } - } - - // wait until the summation is done - ClientSessionHandler sessionHandler = (ClientSessionHandler) protocolProvider.getHandler(); - while ( sessionHandler.isFinished() ) { - Thread.sleep(100); - } - - ioThreadPoolFilter.stop(); - protocolThreadPoolFilter.stop(); - } + Connector connector = new SocketConnector(); + IoAdapter adapter = new IoAdapter(); + + connector.addFilter( Integer.MAX_VALUE, ioThreadPoolFilter ); + adapter.addFilter( Integer.MAX_VALUE, protocolThreadPoolFilter ); + + ProtocolProvider protocolProvider = new ClientProtocolProvider( values ); + for( ;; ) + { + try + { + connector.connect( new InetSocketAddress( HOSTNAME, PORT ), + CONNECT_TIMEOUT, adapter + .adapt( protocolProvider ) ); + break; + } + catch( IOException e ) + { + System.err.println( "Failed to connect." ); + e.printStackTrace(); + Thread.sleep( 5000 ); + } + } + + // wait until the summation is done + ClientSessionHandler sessionHandler = ( ClientSessionHandler ) protocolProvider + .getHandler(); + while( sessionHandler.isFinished() ) + { + Thread.sleep( 100 ); + } + + ioThreadPoolFilter.stop(); + protocolThreadPoolFilter.stop(); + } } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientProtocolProvider.java Tue Jan 11 23:13:42 2005 @@ -17,30 +17,39 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$, */ -public class ClientProtocolProvider implements ProtocolProvider { +public class ClientProtocolProvider implements ProtocolProvider +{ - private static final ProtocolCodecFactory CODEC_FACTORY = new ProtocolCodecFactory() { + private static final ProtocolCodecFactory CODEC_FACTORY = new ProtocolCodecFactory() + { - public ProtocolEncoder newEncoder() { - return new NettyEncoder(); - } - - public ProtocolDecoder newDecoder() { - return new NettyDecoder(new SumUpMessageRecognizer(SumUpMessageRecognizer.CLIENT_MODE)); - } - }; - - private final ProtocolHandler handler; - - public ClientProtocolProvider(int[] values) { - handler = new ClientSessionHandler(values); - } - - public ProtocolCodecFactory getCodecFactory() { - return CODEC_FACTORY; - } - - public ProtocolHandler getHandler() { - return handler; - } -} + public ProtocolEncoder newEncoder() + { + return new NettyEncoder(); + } + + public ProtocolDecoder newDecoder() + { + return new NettyDecoder( + new SumUpMessageRecognizer( + SumUpMessageRecognizer.CLIENT_MODE ) ); + } + }; + + private final ProtocolHandler handler; + + public ClientProtocolProvider( int[] values ) + { + handler = new ClientSessionHandler( values ); + } + + public ProtocolCodecFactory getCodecFactory() + { + return CODEC_FACTORY; + } + + public ProtocolHandler getHandler() + { + return handler; + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ClientSessionHandler.java Tue Jan 11 23:13:42 2005 @@ -13,67 +13,82 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class ClientSessionHandler implements ProtocolHandler { - private final int[] values; - private boolean finished; - - public ClientSessionHandler(int[] values) { - this.values = values; - } - - public boolean isFinished() { - return finished; - } - - public void sessionOpened(ProtocolSession session) { - System.out.println("OPENED"); - // send summation requests - for (int i = 0; i < values.length; i++) { - AddMessage m = new AddMessage(); - m.setSequence(i); - m.setValue(values[i]); - session.write(m); - } - } - - public void sessionClosed(ProtocolSession session) { - System.out.println("CLOSED"); - } - - public void messageReceived(ProtocolSession session, Object message) { - System.out.println("RCVD: " + message); - // server only sends ResultMessage. otherwise, we will have to identify - // its type using instanceof operator. - ResultMessage rm = (ResultMessage) message; - if (rm.isOk()) { - // server returned OK code. - // if received the result message which has the last sequence - // number, - // it is time to disconnect. - if (rm.getSequence() == values.length - 1) { - // print the sum and disconnect. - System.out.println("The sum: " + rm.getValue()); - session.close(); - finished = true; - } - } else { - // seever returned error code because of overflow, etc. - System.err.println("Server error, disconnecting..."); - session.close(); - finished = true; - } - } - - public void messageSent(ProtocolSession session, Object message) { - System.out.println("SENT: " + message); - } - - public void sessionIdle(ProtocolSession session, IdleStatus status) { - // there is no idle time for client - } - - public void exceptionCaught(ProtocolSession session, Throwable cause) { - cause.printStackTrace(); - session.close(); - } +public class ClientSessionHandler implements ProtocolHandler +{ + private final int[] values; + + private boolean finished; + + public ClientSessionHandler( int[] values ) + { + this.values = values; + } + + public boolean isFinished() + { + return finished; + } + + public void sessionOpened( ProtocolSession session ) + { + System.out.println( "OPENED" ); + // send summation requests + for( int i = 0; i < values.length; i++ ) + { + AddMessage m = new AddMessage(); + m.setSequence( i ); + m.setValue( values[ i ] ); + session.write( m ); + } + } + + public void sessionClosed( ProtocolSession session ) + { + System.out.println( "CLOSED" ); + } + + public void messageReceived( ProtocolSession session, Object message ) + { + System.out.println( "RCVD: " + message ); + // server only sends ResultMessage. otherwise, we will have to identify + // its type using instanceof operator. + ResultMessage rm = ( ResultMessage ) message; + if( rm.isOk() ) + { + // server returned OK code. + // if received the result message which has the last sequence + // number, + // it is time to disconnect. + if( rm.getSequence() == values.length - 1 ) + { + // print the sum and disconnect. + System.out.println( "The sum: " + rm.getValue() ); + session.close(); + finished = true; + } + } + else + { + // seever returned error code because of overflow, etc. + System.err.println( "Server error, disconnecting..." ); + session.close(); + finished = true; + } + } + + public void messageSent( ProtocolSession session, Object message ) + { + System.out.println( "SENT: " + message ); + } + + public void sessionIdle( ProtocolSession session, IdleStatus status ) + { + // there is no idle time for client + } + + public void exceptionCaught( ProtocolSession session, Throwable cause ) + { + cause.printStackTrace(); + session.close(); + } } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Constants.java Tue Jan 11 23:13:42 2005 @@ -9,23 +9,32 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class Constants { - public static final int TYPE_LEN = 2; - public static final int SEQUENCE_LEN = 4; - public static final int HEADER_LEN = TYPE_LEN + SEQUENCE_LEN; - public static final int BODY_LEN = 12; - - public static final int RESULT = 0; - public static final int ADD = 1; - - public static final int RESULT_CODE_LEN = 2; - public static final int RESULT_VALUE_LEN = 4; - public static final int ADD_BODY_LEN = 4; - - public static final int RESULT_OK = 0; - public static final int RESULT_ERROR = 1; - - private Constants() { - } +public class Constants +{ + public static final int TYPE_LEN = 2; -} + public static final int SEQUENCE_LEN = 4; + + public static final int HEADER_LEN = TYPE_LEN + SEQUENCE_LEN; + + public static final int BODY_LEN = 12; + + public static final int RESULT = 0; + + public static final int ADD = 1; + + public static final int RESULT_CODE_LEN = 2; + + public static final int RESULT_VALUE_LEN = 4; + + public static final int ADD_BODY_LEN = 4; + + public static final int RESULT_OK = 0; + + public static final int RESULT_ERROR = 1; + + private Constants() + { + } + +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ResultMessage.java Tue Jan 11 23:13:42 2005 @@ -13,88 +13,107 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class ResultMessage extends AbstractMessage { +public class ResultMessage extends AbstractMessage +{ - private boolean ok; + private boolean ok; - private int value; + private int value; - private boolean processedResultCode; + private boolean processedResultCode; - public ResultMessage() { - super(Constants.RESULT); - } - - public boolean isOk() { - return ok; - } - - public void setOk(boolean ok) { - this.ok = ok; - } - - public int getValue() { - return value; - } - - public void setValue(int value) { - this.value = value; - } - - protected boolean readBody(ByteBuffer buf) throws MessageParseException { - if (!processedResultCode) { - processedResultCode = readResultCode(buf); - if (!processedResultCode) - return false; - } - - if (ok) { - if (readValue(buf)) { - processedResultCode = false; - return true; - } else - return false; - } else { - processedResultCode = false; - return true; - } - } - - private boolean readResultCode(ByteBuffer buf) { - if (buf.remaining() < Constants.RESULT_CODE_LEN) - return false; - ok = buf.getShort() == Constants.RESULT_OK; - return true; - } - - private boolean readValue(ByteBuffer buf) { - if (buf.remaining() < Constants.RESULT_VALUE_LEN) - return false; - value = buf.getInt(); - return true; - - } - - protected boolean writeBody(ByteBuffer buf) { - // check if there is enough space to write body - if (buf.remaining() < Constants.RESULT_CODE_LEN - + Constants.RESULT_VALUE_LEN) - return false; - - buf - .putShort((short) (ok ? Constants.RESULT_OK - : Constants.RESULT_ERROR)); - if (ok) - buf.putInt(value); - - return true; - } - - public String toString() { - if (ok) { - return getSequence() + ":RESULT(" + value + ')'; - } else { - return getSequence() + ":RESULT(ERROR)"; - } - } + public ResultMessage() + { + super( Constants.RESULT ); + } + + public boolean isOk() + { + return ok; + } + + public void setOk( boolean ok ) + { + this.ok = ok; + } + + public int getValue() + { + return value; + } + + public void setValue( int value ) + { + this.value = value; + } + + protected boolean readBody( ByteBuffer buf ) throws MessageParseException + { + if( !processedResultCode ) + { + processedResultCode = readResultCode( buf ); + if( !processedResultCode ) + return false; + } + + if( ok ) + { + if( readValue( buf ) ) + { + processedResultCode = false; + return true; + } + else + return false; + } + else + { + processedResultCode = false; + return true; + } + } + + private boolean readResultCode( ByteBuffer buf ) + { + if( buf.remaining() < Constants.RESULT_CODE_LEN ) + return false; + ok = buf.getShort() == Constants.RESULT_OK; + return true; + } + + private boolean readValue( ByteBuffer buf ) + { + if( buf.remaining() < Constants.RESULT_VALUE_LEN ) + return false; + value = buf.getInt(); + return true; + + } + + protected boolean writeBody( ByteBuffer buf ) + { + // check if there is enough space to write body + if( buf.remaining() < Constants.RESULT_CODE_LEN + + Constants.RESULT_VALUE_LEN ) + return false; + + buf.putShort( ( short ) ( ok ? Constants.RESULT_OK + : Constants.RESULT_ERROR ) ); + if( ok ) + buf.putInt( value ); + + return true; + } + + public String toString() + { + if( ok ) + { + return getSequence() + ":RESULT(" + value + ')'; + } + else + { + return getSequence() + ":RESULT(ERROR)"; + } + } } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/Server.java Tue Jan 11 23:13:42 2005 @@ -17,10 +17,12 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class Server { - private static final int SERVER_PORT = 8080; +public class Server +{ + private static final int SERVER_PORT = 8080; - public static void main(String[] args) throws Throwable { + public static void main( String[] args ) throws Throwable + { // Create I/O and Protocol thread pool filter. // I/O thread pool performs encoding and decoding of messages. // Protocol thread pool performs actual protocol flow. @@ -31,13 +33,14 @@ ioThreadPoolFilter.start(); protocolThreadPoolFilter.start(); - Acceptor acceptor = new SocketAcceptor(); - IoAdapter adapter = new IoAdapter(); - - acceptor.addFilter(Integer.MAX_VALUE, ioThreadPoolFilter); - adapter.addFilter(Integer.MAX_VALUE, protocolThreadPoolFilter); + Acceptor acceptor = new SocketAcceptor(); + IoAdapter adapter = new IoAdapter(); - acceptor.bind(new InetSocketAddress(SERVER_PORT), adapter.adapt(new ServerProtocolProvider())); - System.out.println("Listening on port " + SERVER_PORT); - } + acceptor.addFilter( Integer.MAX_VALUE, ioThreadPoolFilter ); + adapter.addFilter( Integer.MAX_VALUE, protocolThreadPoolFilter ); + + acceptor.bind( new InetSocketAddress( SERVER_PORT ), adapter + .adapt( new ServerProtocolProvider() ) ); + System.out.println( "Listening on port " + SERVER_PORT ); + } } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerProtocolProvider.java Tue Jan 11 23:13:42 2005 @@ -17,29 +17,38 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$, */ -public class ServerProtocolProvider implements ProtocolProvider { +public class ServerProtocolProvider implements ProtocolProvider +{ - private static final ProtocolCodecFactory CODEC_FACTORY = new ProtocolCodecFactory() { + private static final ProtocolCodecFactory CODEC_FACTORY = new ProtocolCodecFactory() + { - public ProtocolEncoder newEncoder() { - return new NettyEncoder(); - } - - public ProtocolDecoder newDecoder() { - return new NettyDecoder(new SumUpMessageRecognizer(SumUpMessageRecognizer.SERVER_MODE)); - } - }; - - private static final ProtocolHandler HANDLER = new ServerSessionHandler(); - - public ServerProtocolProvider() { - } - - public ProtocolCodecFactory getCodecFactory() { - return CODEC_FACTORY; - } - - public ProtocolHandler getHandler() { - return HANDLER; - } -} + public ProtocolEncoder newEncoder() + { + return new NettyEncoder(); + } + + public ProtocolDecoder newDecoder() + { + return new NettyDecoder( + new SumUpMessageRecognizer( + SumUpMessageRecognizer.SERVER_MODE ) ); + } + }; + + private static final ProtocolHandler HANDLER = new ServerSessionHandler(); + + public ServerProtocolProvider() + { + } + + public ProtocolCodecFactory getCodecFactory() + { + return CODEC_FACTORY; + } + + public ProtocolHandler getHandler() + { + return HANDLER; + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/ServerSessionHandler.java Tue Jan 11 23:13:42 2005 @@ -15,67 +15,78 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class ServerSessionHandler implements ProtocolHandler { +public class ServerSessionHandler implements ProtocolHandler +{ - public ServerSessionHandler() { - } - - public void sessionOpened(ProtocolSession session) { - System.out.println("OPENED"); - // set idle time to 60 seconds - session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60); - - // initial sum is zero - session.setAttachment(new Integer(0)); - } - - public void sessionClosed(ProtocolSession session) { - System.out.println("CLOSED"); - } - - public void messageReceived(ProtocolSession session, Object message) { - System.out.println("RCVD: " + message); - // client only sends AddMessage. otherwise, we will have to identify - // its type using instanceof operator. - AddMessage am = (AddMessage) message; - - // add the value to the current sum. - int sum = ((Integer) session.getAttachment()).intValue(); - int value = am.getValue(); - long expectedSum = (long) sum + value; - if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) { - // if the sum overflows or underflows, return error message - ResultMessage rm = new ResultMessage(); - rm.setSequence(am.getSequence()); // copy sequence - rm.setOk(false); - session.write(rm); - } else { - // sum up - sum = (int) expectedSum; - session.setAttachment(new Integer(sum)); - - // return the result message - ResultMessage rm = new ResultMessage(); - rm.setSequence(am.getSequence()); // copy sequence - rm.setOk(true); - rm.setValue(sum); - session.write(rm); - } - } - - public void messageSent(ProtocolSession session, Object message) { - System.out.println("SENT: " + message); - } - - public void sessionIdle(ProtocolSession session, IdleStatus status) { - System.out.println("Disconnecting the idle."); - // disconnect an idle client - session.close(); - } - - public void exceptionCaught(ProtocolSession session, Throwable cause) { - cause.printStackTrace(); - // close the connection on exceptional situation - session.close(); - } + public ServerSessionHandler() + { + } + + public void sessionOpened( ProtocolSession session ) + { + System.out.println( "OPENED" ); + // set idle time to 60 seconds + session.getConfig().setIdleTime( IdleStatus.BOTH_IDLE, 60 ); + + // initial sum is zero + session.setAttachment( new Integer( 0 ) ); + } + + public void sessionClosed( ProtocolSession session ) + { + System.out.println( "CLOSED" ); + } + + public void messageReceived( ProtocolSession session, Object message ) + { + System.out.println( "RCVD: " + message ); + // client only sends AddMessage. otherwise, we will have to identify + // its type using instanceof operator. + AddMessage am = ( AddMessage ) message; + + // add the value to the current sum. + int sum = ( ( Integer ) session.getAttachment() ).intValue(); + int value = am.getValue(); + long expectedSum = ( long ) sum + value; + if( expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE ) + { + // if the sum overflows or underflows, return error message + ResultMessage rm = new ResultMessage(); + rm.setSequence( am.getSequence() ); // copy sequence + rm.setOk( false ); + session.write( rm ); + } + else + { + // sum up + sum = ( int ) expectedSum; + session.setAttachment( new Integer( sum ) ); + + // return the result message + ResultMessage rm = new ResultMessage(); + rm.setSequence( am.getSequence() ); // copy sequence + rm.setOk( true ); + rm.setValue( sum ); + session.write( rm ); + } + } + + public void messageSent( ProtocolSession session, Object message ) + { + System.out.println( "SENT: " + message ); + } + + public void sessionIdle( ProtocolSession session, IdleStatus status ) + { + System.out.println( "Disconnecting the idle." ); + // disconnect an idle client + session.close(); + } + + public void exceptionCaught( ProtocolSession session, Throwable cause ) + { + cause.printStackTrace(); + // close the connection on exceptional situation + session.close(); + } } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/sumup/SumUpMessageRecognizer.java Tue Jan 11 23:13:42 2005 @@ -17,50 +17,57 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ */ -public class SumUpMessageRecognizer implements MessageRecognizer { +public class SumUpMessageRecognizer implements MessageRecognizer +{ - public static final int CLIENT_MODE = 1; + public static final int CLIENT_MODE = 1; - public static final int SERVER_MODE = 2; + public static final int SERVER_MODE = 2; - private int mode; + private int mode; - public SumUpMessageRecognizer(int mode) { - switch (mode) { - case CLIENT_MODE: - case SERVER_MODE: - this.mode = mode; - break; - default: - throw new IllegalArgumentException("invalid mode: " + mode); - } - } - - public Message recognize(ByteBuffer buf) throws MessageParseException { - // return null if message type is not arrived yet. - if (buf.remaining() < Constants.TYPE_LEN) - return null; - - int type = buf.getShort(); - switch (mode) { - // server can receive ADD message only. - case SERVER_MODE: - switch (type) { - case Constants.ADD: - return new AddMessage(); - default: - throw new MessageParseException("unknown type: " + type); - } - // client can receive RESULT message only. - case CLIENT_MODE: - switch (type) { - case Constants.RESULT: - return new ResultMessage(); - default: - throw new MessageParseException("unknown type: " + type); - } - default: - throw new InternalError(); // this cannot happen - } - } + public SumUpMessageRecognizer( int mode ) + { + switch( mode ) + { + case CLIENT_MODE: + case SERVER_MODE: + this.mode = mode; + break; + default: + throw new IllegalArgumentException( "invalid mode: " + mode ); + } + } + + public Message recognize( ByteBuffer buf ) throws MessageParseException + { + // return null if message type is not arrived yet. + if( buf.remaining() < Constants.TYPE_LEN ) + return null; + + int type = buf.getShort(); + switch( mode ) + { + // server can receive ADD message only. + case SERVER_MODE: + switch( type ) + { + case Constants.ADD: + return new AddMessage(); + default: + throw new MessageParseException( "unknown type: " + type ); + } + // client can receive RESULT message only. + case CLIENT_MODE: + switch( type ) + { + case Constants.RESULT: + return new ResultMessage(); + default: + throw new MessageParseException( "unknown type: " + type ); + } + default: + throw new InternalError(); // this cannot happen + } + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/Connector.java Tue Jan 11 23:13:42 2005 @@ -66,8 +66,8 @@ * * @throws IOException if failed to connect */ - void connect( SocketAddress address, int timeout, - IoHandler defaultHandler ) throws IOException; + void connect( SocketAddress address, int timeout, IoHandler defaultHandler ) + throws IOException; /** * Adds the specified filter with the specified priority. Greater priority Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java Tue Jan 11 23:13:42 2005 @@ -34,6 +34,12 @@ * <p> * Please refer to <a href="../../../../../xref/org/apache/mina/io/filter/BlacklistFilter.html"><code>BlacklistFilter</code></a> * example. + * <p> + * <strong>Please NEVER implement your filters to wrap + * [EMAIL PROTECTED] IoSession}s.</strong> Users can cache the reference to the session, + * which might malfunction if any filters are added or removed later. + * Please implement [EMAIL PROTECTED] #filterWrite(ByteBuffer)} method to override + * [EMAIL PROTECTED] IoSession#write(ByteBuffer, Object)} method. * * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ @@ -73,4 +79,9 @@ * Filters [EMAIL PROTECTED] IoHandler#dataWritten(IoSession, Object)} event. */ void dataWritten( IoHandler nextHandler, IoSession session, Object marker ); + + /** + * Filters [EMAIL PROTECTED] IoSession#write(ByteBuffer, Object)} method invocation. + */ + ByteBuffer filterWrite( ByteBuffer buf ); } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java Tue Jan 11 23:13:42 2005 @@ -68,4 +68,9 @@ { nextHandler.dataWritten( session, marker ); } + + public ByteBuffer filterWrite( ByteBuffer buf ) + { + return buf; + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java Tue Jan 11 23:13:42 2005 @@ -123,7 +123,7 @@ } public void connect( SocketAddress address, int timeout, - IoHandler defaultHandler ) throws IOException + IoHandler defaultHandler ) throws IOException { connect( address, defaultHandler ); } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Tue Jan 11 23:13:42 2005 @@ -30,6 +30,7 @@ import org.apache.mina.io.IoSession; import org.apache.mina.util.IoHandlerFilterManager; import org.apache.mina.util.Queue; +import org.apache.mina.util.IoHandlerFilterManager.WriteCommand; /** * TODO Insert type comment. @@ -55,6 +56,8 @@ private final SocketAddress localAddress; + private final WriteCommand writeCommand = new WriteCommandImpl(); + private SocketAddress remoteAddress; private SelectionKey key; @@ -144,13 +147,7 @@ public void write( ByteBuffer buf, Object marker ) { - synchronized( writeBufferQueue ) - { - writeBufferQueue.push( buf ); - writeMarkerQueue.push( marker ); - } - - parent.flushSession( this ); + filterManager.write( writeCommand, buf, marker ); } public TransportType getTransportType() @@ -245,5 +242,19 @@ else throw new IllegalArgumentException( "Unknown idle status: " + status ); + } + + private class WriteCommandImpl implements WriteCommand + { + public void execute( ByteBuffer buf, Object marker ) + { + synchronized( writeBufferQueue ) + { + writeBufferQueue.push( buf ); + writeMarkerQueue.push( marker ); + } + + parent.flushSession( DatagramSession.this ); + } } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java Tue Jan 11 23:13:42 2005 @@ -543,4 +543,9 @@ } } } + + public ByteBuffer filterWrite( ByteBuffer buf ) + { + return buf; + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java Tue Jan 11 23:13:42 2005 @@ -72,7 +72,7 @@ } public void connect( SocketAddress address, int timeout, - IoHandler defaultHandler ) throws IOException + IoHandler defaultHandler ) throws IOException { if( address == null ) throw new NullPointerException( "address" ); @@ -145,8 +145,8 @@ { ch.finishConnect(); newSession( ch, entry.handler ); -// SocketSession session = newSession( ch, entry.handler ); -// entry.session = session; + // SocketSession session = newSession( ch, entry.handler ); + // entry.session = session; entry.done = true; synchronized( entry ) @@ -263,7 +263,7 @@ private final IoHandler handler; -// private SocketSession session; + // private SocketSession session; private boolean done; Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketIoProcessor.java Tue Jan 11 23:13:42 2005 @@ -269,8 +269,8 @@ } catch( Throwable e ) { - if ( e instanceof IOException ) - scheduleRemove( session ); + if( e instanceof IOException ) + scheduleRemove( session ); session.getFilterManager().fireExceptionCaught( session, e ); } } @@ -337,8 +337,8 @@ long idleTime, IdleStatus status, long lastIoTime ) { - if( idleTime > 0 && !session.isIdle( status ) - && lastIoTime != 0 && ( currentTime - lastIoTime ) >= idleTime ) + if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0 + && ( currentTime - lastIoTime ) >= idleTime ) { session.setIdle( status, true ); session.getFilterManager().fireSessionIdle( session, status ); @@ -385,7 +385,7 @@ } catch( IOException e ) { - scheduleRemove( session ); + scheduleRemove( session ); session.getFilterManager().fireExceptionCaught( session, e ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketSession.java Tue Jan 11 23:13:42 2005 @@ -30,6 +30,7 @@ import org.apache.mina.io.IoSession; import org.apache.mina.util.IoHandlerFilterManager; import org.apache.mina.util.Queue; +import org.apache.mina.util.IoHandlerFilterManager.WriteCommand; /** * TODO Insert type comment. @@ -57,6 +58,8 @@ private final SocketAddress localAddress; + private final WriteCommand writeCommand = new WriteCommandImpl(); + private SelectionKey key; private Object attachment; @@ -155,13 +158,7 @@ public void write( ByteBuffer buf, Object marker ) { - synchronized( writeBufferQueue ) - { - writeBufferQueue.push( buf ); - writeMarkerQueue.push( marker ); - } - - SocketIoProcessor.getInstance().flushSession( this ); + filterManager.write( writeCommand, buf, marker ); } public TransportType getTransportType() @@ -251,5 +248,19 @@ else throw new IllegalArgumentException( "Unknown idle status: " + status ); + } + + private class WriteCommandImpl implements WriteCommand + { + public void execute( ByteBuffer buf, Object marker ) + { + synchronized( writeBufferQueue ) + { + writeBufferQueue.push( buf ); + writeMarkerQueue.push( marker ); + } + + SocketIoProcessor.getInstance().flushSession( SocketSession.this ); + } } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java Tue Jan 11 23:13:42 2005 @@ -31,6 +31,7 @@ import org.apache.mina.io.IoSession; import org.apache.mina.util.ProtocolHandlerFilterManager; import org.apache.mina.util.Queue; +import org.apache.mina.util.ProtocolHandlerFilterManager.WriteCommand; /** * Adapts the specified [EMAIL PROTECTED] ProtocolProvider} to [EMAIL PROTECTED] IoHandler}. This is @@ -251,7 +252,7 @@ } } - private static class ProtocolSessionImpl implements ProtocolSession + private class ProtocolSessionImpl implements ProtocolSession { private final IoSession session; @@ -263,6 +264,8 @@ private final ProtocolDecoderOutputImpl decOut; + private final WriteCommand writeCommand = new WriteCommandImpl(); + private Object attachment; private ProtocolSessionImpl( IoSession session, @@ -306,12 +309,7 @@ public void write( Object message ) { - synchronized( writeQueue ) - { - writeQueue.push( message ); - } - - adapter.write( session ); + filterManager.write( writeCommand, message ); } public TransportType getTransportType() @@ -367,6 +365,19 @@ public boolean isIdle( IdleStatus status ) { return session.isIdle( status ); + } + + private class WriteCommandImpl implements WriteCommand + { + public void execute( Object message ) + { + synchronized( writeQueue ) + { + writeQueue.push( message ); + } + + adapter.write( session ); + } } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java Tue Jan 11 23:13:42 2005 @@ -18,6 +18,7 @@ */ package org.apache.mina.protocol; +import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; /** @@ -31,6 +32,12 @@ * <li>Message transformation (e.g. encryption and decryption, ...),</li> * <li>and many more.</li> * </ul> + * <p> + * <strong>Please NEVER implement your filters to wrap + * [EMAIL PROTECTED] ProtocolSession}s.</strong> Users can cache the reference to the + * session, which might malfunction if any filters are added or removed later. + * Please implement [EMAIL PROTECTED] #filterWrite(ByteBuffer)} method to override + * [EMAIL PROTECTED] ProtocolSession#write(Object)} method. * * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$ @@ -76,4 +83,9 @@ */ void messageSent( ProtocolHandler nextHandler, ProtocolSession session, Object message ); + + /** + * Filters [EMAIL PROTECTED] ProtocolSession#write(Object)} method invocation. + */ + Object filterWrite( Object message ); } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java Tue Jan 11 23:13:42 2005 @@ -65,4 +65,9 @@ { nextHandler.messageSent( session, message ); } + + public Object filterWrite( Object message ) + { + return message; + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecDecoder.java Tue Jan 11 23:13:42 2005 @@ -19,31 +19,41 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$, */ -public class Asn1CodecDecoder implements ProtocolDecoder { +public class Asn1CodecDecoder implements ProtocolDecoder +{ - private final StatefulDecoder decoder; - private final DecoderCallbackImpl callback = new DecoderCallbackImpl(); - - public Asn1CodecDecoder(StatefulDecoder decoder) { - decoder.setCallback(callback); - this.decoder = decoder; - } - - public void decode(ProtocolSession session, ByteBuffer in, - ProtocolDecoderOutput out) throws ProtocolViolationException { - callback.decOut = out; - try { - decoder.decode(in.buf()); - } catch (DecoderException e) { - throw new ProtocolViolationException("Failed to decode.", e); - } - } - - private class DecoderCallbackImpl implements DecoderCallback { - private ProtocolDecoderOutput decOut; - - public void decodeOccurred(StatefulDecoder decoder, Object decoded) { - decOut.write(decoded); - } - } -} + private final StatefulDecoder decoder; + + private final DecoderCallbackImpl callback = new DecoderCallbackImpl(); + + public Asn1CodecDecoder( StatefulDecoder decoder ) + { + decoder.setCallback( callback ); + this.decoder = decoder; + } + + public void decode( ProtocolSession session, ByteBuffer in, + ProtocolDecoderOutput out ) + throws ProtocolViolationException + { + callback.decOut = out; + try + { + decoder.decode( in.buf() ); + } + catch( DecoderException e ) + { + throw new ProtocolViolationException( "Failed to decode.", e ); + } + } + + private class DecoderCallbackImpl implements DecoderCallback + { + private ProtocolDecoderOutput decOut; + + public void decodeOccurred( StatefulDecoder decoder, Object decoded ) + { + decOut.write( decoded ); + } + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/Asn1CodecEncoder.java Tue Jan 11 23:13:42 2005 @@ -19,37 +19,54 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$, */ -public class Asn1CodecEncoder implements ProtocolEncoder { - private final StatefulEncoder encoder; - private final EncoderCallbackImpl callback = new EncoderCallbackImpl(); +public class Asn1CodecEncoder implements ProtocolEncoder +{ + private final StatefulEncoder encoder; - public Asn1CodecEncoder(StatefulEncoder encoder) { - encoder.setCallback(callback); - this.encoder = encoder; - } + private final EncoderCallbackImpl callback = new EncoderCallbackImpl(); - public void encode(ProtocolSession session, Object message, ProtocolEncoderOutput out) throws ProtocolViolationException { - callback.encOut = out; - try { - encoder.encode(message); - } catch (EncoderException e) { - throw new ProtocolViolationException("Encoding failed.", e); - } - } - - private class EncoderCallbackImpl implements EncoderCallback { - private ProtocolEncoderOutput encOut; + public Asn1CodecEncoder( StatefulEncoder encoder ) + { + encoder.setCallback( callback ); + this.encoder = encoder; + } - public void encodeOccurred(StatefulEncoder codec, Object encoded) { - if (encoded instanceof java.nio.ByteBuffer) { - java.nio.ByteBuffer buf = (java.nio.ByteBuffer) encoded; - ByteBuffer outBuf = ByteBuffer.allocate(buf.remaining()); - outBuf.put(buf); - outBuf.flip(); - encOut.write(outBuf); - } else { - throw new IllegalArgumentException("Encoded result is not a ByteBuffer: " + encoded.getClass()); - } - } - } -} + public void encode( ProtocolSession session, Object message, + ProtocolEncoderOutput out ) + throws ProtocolViolationException + { + callback.encOut = out; + try + { + encoder.encode( message ); + } + catch( EncoderException e ) + { + throw new ProtocolViolationException( "Encoding failed.", e ); + } + } + + private class EncoderCallbackImpl implements EncoderCallback + { + private ProtocolEncoderOutput encOut; + + public void encodeOccurred( StatefulEncoder codec, Object encoded ) + { + if( encoded instanceof java.nio.ByteBuffer ) + { + java.nio.ByteBuffer buf = ( java.nio.ByteBuffer ) encoded; + ByteBuffer outBuf = ByteBuffer.allocate( buf.remaining() ); + outBuf.put( buf ); + outBuf.flip(); + encOut.write( outBuf ); + } + else + { + throw new IllegalArgumentException( + "Encoded result is not a ByteBuffer: " + + encoded + .getClass() ); + } + } + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyDecoder.java Tue Jan 11 23:13:42 2005 @@ -21,89 +21,119 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$, */ -public class NettyDecoder implements ProtocolDecoder { - private final MessageRecognizer recognizer; - - private java.nio.ByteBuffer readBuf = java.nio.ByteBuffer.allocate(1024); - - private Message readingMessage; - - public NettyDecoder(MessageRecognizer recognizer) { - if (recognizer == null) - throw new NullPointerException(); - - this.recognizer = recognizer; - } - - public void decode(ProtocolSession session, ByteBuffer in, - ProtocolDecoderOutput out) throws ProtocolViolationException { - - put(in); - - Message m = readingMessage; - try { - for (;;) { - readBuf.flip(); - if (m == null) { - int limit = readBuf.limit(); - boolean failed = true; - try { - m = recognizer.recognize(readBuf); - failed = false; - } finally { - if (failed) { - // clear the read buffer if failed to recognize - readBuf.clear(); - break; - } else { - if (m == null) { - readBuf.limit(readBuf.capacity()); - readBuf.position(limit); - break; // finish decoding - } else { - // reset buffer for read - readBuf.limit(limit); - readBuf.position(0); - } - } - } - } - - if (m != null) { - try { - if (m.read(readBuf)) { - out.write(m); - m = null; - } - } finally { - if (readBuf.hasRemaining()) { - readBuf.compact(); - } else { - readBuf.clear(); - } - } - } - } - } catch (MessageParseException e) { - m = null; // discard reading message - throw new ProtocolViolationException("Failed to decode.", e); - } - finally { - readingMessage = m; - } - } - - private void put(ByteBuffer in) { - // copy to read buffer - if (in.remaining() > readBuf.remaining()) - expand((readBuf.position() + in.remaining()) * 3 / 2); - readBuf.put(in.buf()); - } - - private void expand(int newCapacity) { - java.nio.ByteBuffer newBuf = java.nio.ByteBuffer.allocate(newCapacity); - readBuf.flip(); - newBuf.put(readBuf); - readBuf = newBuf; - } +public class NettyDecoder implements ProtocolDecoder +{ + private final MessageRecognizer recognizer; + + private java.nio.ByteBuffer readBuf = java.nio.ByteBuffer.allocate( 1024 ); + + private Message readingMessage; + + public NettyDecoder( MessageRecognizer recognizer ) + { + if( recognizer == null ) + throw new NullPointerException(); + + this.recognizer = recognizer; + } + + public void decode( ProtocolSession session, ByteBuffer in, + ProtocolDecoderOutput out ) + throws ProtocolViolationException + { + + put( in ); + + Message m = readingMessage; + try + { + for( ;; ) + { + readBuf.flip(); + if( m == null ) + { + int limit = readBuf.limit(); + boolean failed = true; + try + { + m = recognizer.recognize( readBuf ); + failed = false; + } + finally + { + if( failed ) + { + // clear the read buffer if failed to recognize + readBuf.clear(); + break; + } + else + { + if( m == null ) + { + readBuf.limit( readBuf.capacity() ); + readBuf.position( limit ); + break; // finish decoding + } + else + { + // reset buffer for read + readBuf.limit( limit ); + readBuf.position( 0 ); + } + } + } + } + + if( m != null ) + { + try + { + if( m.read( readBuf ) ) + { + out.write( m ); + m = null; + } + } + finally + { + if( readBuf.hasRemaining() ) + { + readBuf.compact(); + } + else + { + readBuf.clear(); + } + } + } + } + } + catch( MessageParseException e ) + { + m = null; // discard reading message + throw new ProtocolViolationException( "Failed to decode.", e ); + } + finally + { + readingMessage = m; + } + } + + private void put( ByteBuffer in ) + { + // copy to read buffer + if( in.remaining() > readBuf.remaining() ) + expand( ( readBuf.position() + in.remaining() ) * 3 / 2 ); + readBuf.put( in.buf() ); + } + + private void expand( int newCapacity ) + { + java.nio.ByteBuffer newBuf = java.nio.ByteBuffer + .allocate( newCapacity ); + readBuf.flip(); + newBuf.put( readBuf ); + readBuf = newBuf; + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/codec/NettyEncoder.java Tue Jan 11 23:13:42 2005 @@ -19,33 +19,46 @@ * @author Trustin Lee ([EMAIL PROTECTED]) * @version $Rev$, $Date$, */ -public class NettyEncoder implements ProtocolEncoder { +public class NettyEncoder implements ProtocolEncoder +{ - public NettyEncoder() { - } + public NettyEncoder() + { + } - public void encode(ProtocolSession session, Object message, - ProtocolEncoderOutput out) throws ProtocolViolationException { - if (!(message instanceof Message)) { - throw new ProtocolViolationException( - "This encoder can decode only Netty Messages."); - } + public void encode( ProtocolSession session, Object message, + ProtocolEncoderOutput out ) + throws ProtocolViolationException + { + if( ! ( message instanceof Message ) ) + { + throw new ProtocolViolationException( + "This encoder can decode only Netty Messages." ); + } - for (;;) { - ByteBuffer buf = ByteBuffer.allocate(8192); - Message m = (Message) message; - try { - if (m.write(buf.buf())) { - break; - } - } finally { - buf.flip(); - if (buf.hasRemaining()) { - out.write(buf); - } else { - ByteBuffer.release(buf); - } - } - } - } + for( ;; ) + { + ByteBuffer buf = ByteBuffer.allocate( 8192 ); + Message m = ( Message ) message; + try + { + if( m.write( buf.buf() ) ) + { + break; + } + } + finally + { + buf.flip(); + if( buf.hasRemaining() ) + { + out.write( buf ); + } + else + { + ByteBuffer.release( buf ); + } + } + } + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java Tue Jan 11 23:13:42 2005 @@ -516,4 +516,9 @@ } } } + + public Object filterWrite( Object message ) + { + return message; + } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/vmpipe/VmPipeSession.java Tue Jan 11 23:13:42 2005 @@ -13,6 +13,7 @@ import org.apache.mina.protocol.ProtocolHandler; import org.apache.mina.protocol.ProtocolSession; import org.apache.mina.util.ProtocolHandlerFilterManager; +import org.apache.mina.util.ProtocolHandlerFilterManager.WriteCommand; /** * TODO Document me. @@ -30,14 +31,16 @@ private final ProtocolHandler localHandler; + private final VmPipeSessionConfig config = new VmPipeSessionConfig(); + + private final WriteCommand writeCommand = new WriteCommandImpl(); + final ProtocolHandlerFilterManager localFilterManager; final ProtocolHandlerFilterManager remoteFilterManager; final VmPipeSession remoteSession; - private final VmPipeSessionConfig config = new VmPipeSessionConfig(); - private Object attachment; boolean closed; @@ -130,12 +133,7 @@ public void write( Object message ) { - synchronized( lock ) - { - if( closed ) - throw new IllegalStateException( "Session is closed." ); - remoteFilterManager.fireMessageReceived( remoteSession, message ); - } + localFilterManager.write( writeCommand, message ); } public TransportType getTransportType() @@ -202,4 +200,19 @@ throw new IllegalArgumentException( "Illegal statue: " + status ); } + + private class WriteCommandImpl implements WriteCommand + { + public void execute( Object message ) + { + synchronized( lock ) + { + if( closed ) + throw new IllegalStateException( "Session is closed." ); + remoteFilterManager.fireMessageReceived( remoteSession, + message ); + } + } + } + } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Tue Jan 11 23:13:42 2005 @@ -18,6 +18,9 @@ */ package org.apache.mina.util; +import java.util.ArrayList; +import java.util.List; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; import org.apache.mina.io.IoHandler; @@ -75,9 +78,17 @@ { session.getHandler().dataWritten( session, marker ); } + + public ByteBuffer filterWrite( ByteBuffer buf ) + { + return buf; + } }; - private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER ); + private Entry head = new Entry( null, null, Integer.MIN_VALUE, + FINAL_FILTER ); + + private final Entry tail = head; public IoHandlerFilterManager() { @@ -89,31 +100,26 @@ Entry prevEntry = null; for( ;; ) { - if( e.nextEntry == null ) + if( e.priority < priority ) { - Entry newEntry = new Entry( e, priority, filter ); + Entry newEntry = new Entry( prevEntry, e, priority, filter ); if( prevEntry == null ) { head = newEntry; } else { + prevEntry.nextEntry.prevEntry = newEntry; prevEntry.nextEntry = newEntry; } break; } - else if( e.priority < priority ) + else if( e.priority == priority ) { - Entry newEntry = new Entry( e, priority, filter ); - if( prevEntry == null ) - { - head = newEntry; - } - else - { - prevEntry.nextEntry = newEntry; - } - break; + throw new IllegalArgumentException( + "Other filter is registered with priority " + + priority + + " already." ); } prevEntry = e; e = e.nextEntry; @@ -135,10 +141,12 @@ if( prevEntry == null ) { // e is head + e.nextEntry.prevEntry = null; head = e.nextEntry; } else { + e.nextEntry.prevEntry = prevEntry; prevEntry.nextEntry = e.nextEntry; } break; @@ -226,8 +234,51 @@ } } + public void write( WriteCommand cmd, ByteBuffer buf, Object marker ) + { + Entry e = tail; + do + { + buf = e.filter.filterWrite( buf ); + e = e.prevEntry; + } + while( e != null ); + + cmd.execute( buf, marker ); + } + + List filters() + { + List list = new ArrayList(); + Entry e = head; + do + { + list.add( e.filter ); + e = e.nextEntry; + } + while( e != null ); + + return list; + } + + List filtersReversed() + { + List list = new ArrayList(); + Entry e = tail; + do + { + list.add( e.filter ); + e = e.prevEntry; + } + while( e != null ); + + return list; + } + private static class Entry { + private Entry prevEntry; + private Entry nextEntry; private final int priority; @@ -236,10 +287,12 @@ private final IoHandler nextHandler; - private Entry( Entry nextEntry, int priority, IoHandlerFilter filter ) + private Entry( Entry prevEntry, Entry nextEntry, int priority, + IoHandlerFilter filter ) { if( filter == null ) throw new NullPointerException( "filter" ); + this.prevEntry = prevEntry; this.nextEntry = nextEntry; this.priority = priority; this.filter = filter; @@ -336,5 +389,10 @@ } }; } + } + + public static interface WriteCommand + { + void execute( ByteBuffer buf, Object marker ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java?view=diff&rev=124943&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java&r1=124942&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java&r2=124943 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java Tue Jan 11 23:13:42 2005 @@ -18,6 +18,9 @@ */ package org.apache.mina.util; +import java.util.ArrayList; +import java.util.List; + import org.apache.mina.common.IdleStatus; import org.apache.mina.protocol.ProtocolHandler; import org.apache.mina.protocol.ProtocolHandlerFilter; @@ -69,9 +72,17 @@ { session.getHandler().messageSent( session, message ); } + + public Object filterWrite( Object message ) + { + return message; + } }; - private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER ); + private Entry head = new Entry( null, null, Integer.MIN_VALUE, + FINAL_FILTER ); + + private Entry tail = head; public ProtocolHandlerFilterManager() { @@ -84,31 +95,26 @@ Entry prevEntry = null; for( ;; ) { - if( e.nextEntry == null ) + if( e.priority < priority ) { - Entry newEntry = new Entry( e, priority, filter ); + Entry newEntry = new Entry( prevEntry, e, priority, filter ); if( prevEntry == null ) { head = newEntry; } else { + prevEntry.nextEntry.prevEntry = newEntry; prevEntry.nextEntry = newEntry; } break; } - else if( e.priority < priority ) + else if( e.priority == priority ) { - Entry newEntry = new Entry( e, priority, filter ); - if( prevEntry == null ) - { - head = newEntry; - } - else - { - prevEntry.nextEntry = newEntry; - } - break; + throw new IllegalArgumentException( + "Other filter is registered with priority " + + priority + + " already." ); } prevEntry = e; e = e.nextEntry; @@ -130,10 +136,12 @@ if( prevEntry == null ) { // e is head + e.nextEntry.prevEntry = null; head = e.nextEntry; } else { + e.nextEntry.prevEntry = prevEntry; prevEntry.nextEntry = e.nextEntry; } break; @@ -221,8 +229,51 @@ } } + public void write( WriteCommand cmd, Object message ) + { + Entry e = tail; + do + { + message = e.filter.filterWrite( message ); + e = e.prevEntry; + } + while( e != null ); + + cmd.execute( message ); + } + + List filters() + { + List list = new ArrayList(); + Entry e = head; + do + { + list.add( e.filter ); + e = e.nextEntry; + } + while( e != null ); + + return list; + } + + List filtersReversed() + { + List list = new ArrayList(); + Entry e = tail; + do + { + list.add( e.filter ); + e = e.prevEntry; + } + while( e != null ); + + return list; + } + private static class Entry { + private Entry prevEntry; + private Entry nextEntry; private final int priority; @@ -231,11 +282,12 @@ private final ProtocolHandler nextHandler; - private Entry( Entry nextEntry, int priority, + private Entry( Entry prevEntry, Entry nextEntry, int priority, ProtocolHandlerFilter filter ) { if( filter == null ) throw new NullPointerException( "filter" ); + this.prevEntry = prevEntry; this.nextEntry = nextEntry; this.priority = priority; this.filter = filter; @@ -337,5 +389,10 @@ } }; } + } + + public static interface WriteCommand + { + void execute( Object message ); } } Added: incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterImpl.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterImpl.java?view=auto&rev=124943 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterImpl.java Tue Jan 11 23:13:42 2005 @@ -0,0 +1,41 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.util; + +import org.apache.mina.io.IoHandlerFilterAdapter; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class IoHandlerFilterImpl extends IoHandlerFilterAdapter +{ + private final char c; + + public IoHandlerFilterImpl( char c ) + { + this.c = c; + } + + public int hashCode() + { + return c; + } + + public boolean equals( Object o ) + { + if( o == null ) + return false; + if( ! ( o instanceof IoHandlerFilterImpl ) ) + return false; + return this.c == ( ( IoHandlerFilterImpl ) o ).c; + } + + public String toString() + { + return "" + c; + } +} \ No newline at end of file Added: incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterManagerTest.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterManagerTest.java?view=auto&rev=124943 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/IoHandlerFilterManagerTest.java Tue Jan 11 23:13:42 2005 @@ -0,0 +1,129 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.util; + +import java.util.List; + +import junit.framework.TestCase; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class IoHandlerFilterManagerTest extends TestCase +{ + private IoHandlerFilterManager manager; + + private IoHandlerFilterImpl filterA; + + private IoHandlerFilterImpl filterB; + + private IoHandlerFilterImpl filterC; + + private IoHandlerFilterImpl filterD; + + private IoHandlerFilterImpl filterE; + + public void setUp() + { + manager = new IoHandlerFilterManager(); + filterA = new IoHandlerFilterImpl( 'A' ); + filterB = new IoHandlerFilterImpl( 'B' ); + filterC = new IoHandlerFilterImpl( 'C' ); + filterD = new IoHandlerFilterImpl( 'D' ); + filterE = new IoHandlerFilterImpl( 'E' ); + manager.addFilter( 0, filterA ); + manager.addFilter( -2, filterB ); + manager.addFilter( 2, filterC ); + manager.addFilter( -1, filterD ); + manager.addFilter( 1, filterE ); + } + + public void testAdd() + { + List list; + list = manager.filters(); + assertEquals( 6, list.size() ); + assertSame( filterC, list.get( 0 ) ); + assertSame( filterE, list.get( 1 ) ); + assertSame( filterA, list.get( 2 ) ); + assertSame( filterD, list.get( 3 ) ); + assertSame( filterB, list.get( 4 ) ); + + list = manager.filtersReversed(); + assertEquals( 6, list.size() ); + assertSame( filterC, list.get( 5 ) ); + assertSame( filterE, list.get( 4 ) ); + assertSame( filterA, list.get( 3 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 1 ) ); + } + + public void testRemoveFirst() + { + manager.removeFilter( filterC ); + + List list; + list = manager.filters(); + assertEquals( 5, list.size() ); + assertSame( filterE, list.get( 0 ) ); + assertSame( filterA, list.get( 1 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 3 ) ); + + list = manager.filtersReversed(); + assertEquals( 5, list.size() ); + assertSame( filterE, list.get( 4 ) ); + assertSame( filterA, list.get( 3 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 1 ) ); + } + + public void testRemoveLast() + { + manager.removeFilter( filterB ); + + List list; + list = manager.filters(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 0 ) ); + assertSame( filterE, list.get( 1 ) ); + assertSame( filterA, list.get( 2 ) ); + assertSame( filterD, list.get( 3 ) ); + + list = manager.filtersReversed(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 4 ) ); + assertSame( filterE, list.get( 3 ) ); + assertSame( filterA, list.get( 2 ) ); + assertSame( filterD, list.get( 1 ) ); + } + + public void testRemoveMiddle() + { + manager.removeFilter( filterA ); + + List list; + list = manager.filters(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 0 ) ); + assertSame( filterE, list.get( 1 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 3 ) ); + + list = manager.filtersReversed(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 4 ) ); + assertSame( filterE, list.get( 3 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 1 ) ); + } + + public static void main( String[] args ) + { + junit.textui.TestRunner.run( IoHandlerFilterManagerTest.class ); + } +} \ No newline at end of file Added: incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterImpl.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterImpl.java?view=auto&rev=124943 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterImpl.java Tue Jan 11 23:13:42 2005 @@ -0,0 +1,41 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.util; + +import org.apache.mina.protocol.ProtocolHandlerFilterAdapter; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class ProtocolHandlerFilterImpl extends ProtocolHandlerFilterAdapter +{ + private final char c; + + public ProtocolHandlerFilterImpl( char c ) + { + this.c = c; + } + + public int hashCode() + { + return c; + } + + public boolean equals( Object o ) + { + if( o == null ) + return false; + if( ! ( o instanceof ProtocolHandlerFilterImpl ) ) + return false; + return this.c == ( ( ProtocolHandlerFilterImpl ) o ).c; + } + + public String toString() + { + return "" + c; + } +} \ No newline at end of file Added: incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterManagerTest.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterManagerTest.java?view=auto&rev=124943 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/test/org/apache/mina/util/ProtocolHandlerFilterManagerTest.java Tue Jan 11 23:13:42 2005 @@ -0,0 +1,129 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.util; + +import java.util.List; + +import junit.framework.TestCase; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class ProtocolHandlerFilterManagerTest extends TestCase +{ + private ProtocolHandlerFilterManager manager; + + private ProtocolHandlerFilterImpl filterA; + + private ProtocolHandlerFilterImpl filterB; + + private ProtocolHandlerFilterImpl filterC; + + private ProtocolHandlerFilterImpl filterD; + + private ProtocolHandlerFilterImpl filterE; + + public void setUp() + { + manager = new ProtocolHandlerFilterManager(); + filterA = new ProtocolHandlerFilterImpl( 'A' ); + filterB = new ProtocolHandlerFilterImpl( 'B' ); + filterC = new ProtocolHandlerFilterImpl( 'C' ); + filterD = new ProtocolHandlerFilterImpl( 'D' ); + filterE = new ProtocolHandlerFilterImpl( 'E' ); + manager.addFilter( 0, filterA ); + manager.addFilter( -2, filterB ); + manager.addFilter( 2, filterC ); + manager.addFilter( -1, filterD ); + manager.addFilter( 1, filterE ); + } + + public void testAdd() + { + List list; + list = manager.filters(); + assertEquals( 6, list.size() ); + assertSame( filterC, list.get( 0 ) ); + assertSame( filterE, list.get( 1 ) ); + assertSame( filterA, list.get( 2 ) ); + assertSame( filterD, list.get( 3 ) ); + assertSame( filterB, list.get( 4 ) ); + + list = manager.filtersReversed(); + assertEquals( 6, list.size() ); + assertSame( filterC, list.get( 5 ) ); + assertSame( filterE, list.get( 4 ) ); + assertSame( filterA, list.get( 3 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 1 ) ); + } + + public void testRemoveFirst() + { + manager.removeFilter( filterC ); + + List list; + list = manager.filters(); + assertEquals( 5, list.size() ); + assertSame( filterE, list.get( 0 ) ); + assertSame( filterA, list.get( 1 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 3 ) ); + + list = manager.filtersReversed(); + assertEquals( 5, list.size() ); + assertSame( filterE, list.get( 4 ) ); + assertSame( filterA, list.get( 3 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 1 ) ); + } + + public void testRemoveLast() + { + manager.removeFilter( filterB ); + + List list; + list = manager.filters(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 0 ) ); + assertSame( filterE, list.get( 1 ) ); + assertSame( filterA, list.get( 2 ) ); + assertSame( filterD, list.get( 3 ) ); + + list = manager.filtersReversed(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 4 ) ); + assertSame( filterE, list.get( 3 ) ); + assertSame( filterA, list.get( 2 ) ); + assertSame( filterD, list.get( 1 ) ); + } + + public void testRemoveMiddle() + { + manager.removeFilter( filterA ); + + List list; + list = manager.filters(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 0 ) ); + assertSame( filterE, list.get( 1 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 3 ) ); + + list = manager.filtersReversed(); + assertEquals( 5, list.size() ); + assertSame( filterC, list.get( 4 ) ); + assertSame( filterE, list.get( 3 ) ); + assertSame( filterD, list.get( 2 ) ); + assertSame( filterB, list.get( 1 ) ); + } + + public static void main( String[] args ) + { + junit.textui.TestRunner.run( ProtocolHandlerFilterManagerTest.class ); + } +} \ No newline at end of file
