Author: trustin Date: Fri Dec 24 00:33:05 2004 New Revision: 123284 URL: http://svn.apache.org/viewcvs?view=rev&rev=123284 Log: * Fixed: Deadlock when stopping thread pool filters * Added: DatagramChannel support * Removed: Filter lifecycle methods to make filters reusable between different acceptors and connectors * Added: TestCase for echo server example Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java (contents, props changed) incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java (contents, props changed) incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java (contents, props changed) incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java (contents, props changed) Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java
Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java Fri Dec 24 00:33:05 2004 @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import org.apache.mina.io.Acceptor; +import org.apache.mina.io.datagram.DatagramAcceptor; import org.apache.mina.io.filter.IoThreadPoolFilter; import org.apache.mina.io.socket.SocketAcceptor; @@ -37,9 +38,19 @@ public static void main( String[] args ) throws Exception { Acceptor acceptor = new SocketAcceptor(); - acceptor.addFilter( Integer.MAX_VALUE, new IoThreadPoolFilter() ); + + IoThreadPoolFilter threadPoolFilter = new IoThreadPoolFilter(); + threadPoolFilter.start(); + acceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter ); + acceptor.bind( new InetSocketAddress( PORT ), new EchoProtocolHandler() ); + + Acceptor datagramAcceptor = new DatagramAcceptor(); + datagramAcceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter ); + datagramAcceptor.bind( new InetSocketAddress( PORT ), + new EchoProtocolHandler() ); + System.out.println( "Listening on port " + PORT ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java Fri Dec 24 00:33:05 2004 @@ -30,10 +30,6 @@ */ public interface IoHandlerFilter { - void init(); - - void destroy(); - void sessionOpened( IoHandler nextHandler, IoSession session ); void sessionClosed( IoHandler nextHandler, IoSession session ); Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java Fri Dec 24 00:33:05 2004 @@ -30,14 +30,6 @@ */ public class IoHandlerFilterAdapter implements IoHandlerFilter { - public void init() - { - } - - public void destroy() - { - } - public void sessionOpened( IoHandler nextHandler, IoSession session ) { nextHandler.sessionOpened( session ); Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java?view=auto&rev=123284 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java Fri Dec 24 00:33:05 2004 @@ -0,0 +1,426 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.io.datagram; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.mina.io.Acceptor; +import org.apache.mina.io.ExceptionMonitor; +import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoHandlerFilter; +import org.apache.mina.util.DefaultExceptionMonitor; +import org.apache.mina.util.IoHandlerFilterManager; +import org.apache.mina.util.Queue; + +/** + * TODO Insert type comment. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class DatagramAcceptor implements Acceptor, DatagramProcessor +{ + private static volatile int nextId = 0; + + private final IoHandlerFilterManager filterManager = new IoHandlerFilterManager(); + + private final int id = nextId++; + + private final Selector selector; + + private final Map channels = new HashMap(); + + private final Queue registerQueue = new Queue(); + + private final Queue cancelQueue = new Queue(); + + private final Queue flushingSessions = new Queue(); + + private ExceptionMonitor exceptionMonitor = new DefaultExceptionMonitor(); + + private Worker worker; + + /** + * Creates a new instance. + * + * @throws IOException + */ + public DatagramAcceptor() throws IOException + { + selector = Selector.open(); + } + + public void bind( SocketAddress address, IoHandler defaultHandler ) + throws IOException + { + if( address == null ) + throw new NullPointerException( "address" ); + if( defaultHandler == null ) + throw new NullPointerException( "defaultHandler" ); + + if( ! ( address instanceof InetSocketAddress ) ) + throw new IllegalArgumentException( "Unexpected address type: " + + address.getClass() ); + + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking( false ); + ch.socket().bind( address ); + + synchronized( this ) + { + synchronized( registerQueue ) + { + registerQueue.push( new RegistrationRequest( ch, + defaultHandler ) ); + } + channels.put( address, ch ); + + if( worker == null ) + { + worker = new Worker(); + worker.start(); + } + } + + selector.wakeup(); + } + + public void unbind( SocketAddress address ) + { + if( address == null ) + throw new NullPointerException( "address" ); + + DatagramChannel ch; + + synchronized( this ) + { + ch = ( DatagramChannel ) channels.get( address ); + + if( ch == null ) + throw new IllegalArgumentException( "Unknown address: " + + address ); + + SelectionKey key = ch.keyFor( selector ); + channels.remove( address ); + synchronized( cancelQueue ) + { + cancelQueue.push( key ); + } + } + + selector.wakeup(); + ch.socket().close(); + } + + public void flushSession( DatagramSession session ) + { + scheduleFlush( session ); + selector.wakeup(); + } + + private void scheduleFlush( DatagramSession session ) + { + synchronized( flushingSessions ) + { + flushingSessions.push( session ); + } + } + + private class Worker extends Thread + { + public Worker() + { + super( "DatagramAcceptor-" + id ); + } + + public void run() + { + for( ;; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + + if( selector.keys().isEmpty() ) + { + synchronized( DatagramAcceptor.this ) + { + if( selector.keys().isEmpty() ) + { + worker = null; + break; + } + } + } + + cancelKeys(); + + if( nKeys > 0 ) + { + processReadySessions( selector.selectedKeys() ); + } + flushSessions(); + } + catch( IOException e ) + { + exceptionMonitor + .exceptionCaught( DatagramAcceptor.this, e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + } + } + } + } + } + + private void processReadySessions( Set keys ) + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + it.remove(); + + DatagramChannel ch = ( DatagramChannel ) key.channel(); + + DatagramSession session = new DatagramSession( + DatagramAcceptor.this, + filterManager, + ch, + key, + ( IoHandler ) key + .attachment() ); + + if( key.isReadable() ) + { + readSession( session ); + } + + if( key.isWritable() ) + { + scheduleFlush( session ); + } + } + } + + private void readSession( DatagramSession session ) + { + + ByteBuffer readBuf = ByteBuffer.allocate( 1500 ); + try + { + SocketAddress remoteAddress = session.getChannel() + .receive( readBuf ); + if( remoteAddress != null ) + { + readBuf.flip(); + session.setRemoteAddress( remoteAddress ); + filterManager.fireDataRead( session, readBuf ); + } + } + catch( IOException e ) + { + filterManager.fireExceptionCaught( session, e ); + } + } + + private void flushSessions() + { + if( flushingSessions.size() == 0 ) + return; + + for( ;; ) + { + DatagramSession session; + + synchronized( flushingSessions ) + { + session = ( DatagramSession ) flushingSessions.pop(); + } + + if( session == null ) + break; + + try + { + flush( session ); + } + catch( IOException e ) + { + session.getFilterManager().fireExceptionCaught( session, e ); + } + } + } + + private void flush( DatagramSession session ) throws IOException + { + DatagramChannel ch = session.getChannel(); + + Queue writeBufferQueue = session.getWriteBufferQueue(); + Queue writeMarkerQueue = session.getWriteMarkerQueue(); + + ByteBuffer buf; + Object marker; + for( ;; ) + { + synchronized( writeBufferQueue ) + { + buf = ( ByteBuffer ) writeBufferQueue.first(); + marker = writeMarkerQueue.first(); + } + + if( buf == null ) + break; + + if( buf.remaining() == 0 ) + { + // pop and fire event + synchronized( writeBufferQueue ) + { + writeBufferQueue.pop(); + writeMarkerQueue.pop(); + } + session.getFilterManager().fireDataWritten( session, marker ); + continue; + } + + int writtenBytes = ch.send( buf, session.getRemoteAddress() ); + + SelectionKey key = session.getSelectionKey(); + if( writtenBytes == 0 ) + { + // Kernel buffer is full + key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); + } + else + { + key + .interestOps( key.interestOps() + & ( ~SelectionKey.OP_WRITE ) ); + + // pop and fire event + synchronized( writeBufferQueue ) + { + writeBufferQueue.pop(); + writeMarkerQueue.pop(); + } + session.getFilterManager().fireDataWritten( session, marker ); + } + } + } + + private void registerNew() throws ClosedChannelException + { + if( registerQueue.isEmpty() ) + return; + + for( ;; ) + { + RegistrationRequest req; + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + break; + + req.channel.register( selector, SelectionKey.OP_READ, req.handler ); + } + } + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + return; + + for( ;; ) + { + SelectionKey key; + synchronized( cancelQueue ) + { + key = ( SelectionKey ) cancelQueue.pop(); + } + + if( key == null ) + break; + else + { + key.cancel(); + selector.wakeup(); // wake up again to trigger thread death + } + } + } + + public void addFilter( int priority, IoHandlerFilter filter ) + { + filterManager.addFilter( priority, filter ); + } + + public void removeFilter( IoHandlerFilter filter ) + { + filterManager.removeFilter( filter ); + } + + private static class RegistrationRequest + { + private final DatagramChannel channel; + + private final IoHandler handler; + + private RegistrationRequest( DatagramChannel channel, IoHandler handler ) + { + this.channel = channel; + this.handler = handler; + } + } + + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + + public void setExceptionMonitor( ExceptionMonitor monitor ) + { + if( monitor == null ) + { + monitor = new DefaultExceptionMonitor(); + } + + this.exceptionMonitor = monitor; + } +} \ No newline at end of file Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java?view=auto&rev=123284 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java Fri Dec 24 00:33:05 2004 @@ -0,0 +1,30 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.io.datagram; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public interface DatagramProcessor +{ + void flushSession( DatagramSession session ); +} \ No newline at end of file Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java?view=auto&rev=123284 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Fri Dec 24 00:33:05 2004 @@ -0,0 +1,253 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.io.datagram; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.SessionConfig; +import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoSession; +import org.apache.mina.util.IoHandlerFilterManager; +import org.apache.mina.util.Queue; + +/** + * TODO Insert type comment. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +class DatagramSession implements IoSession +{ + private final DatagramProcessor parent; + + private final IoHandlerFilterManager filterManager; + + private final DatagramChannel ch; + + private final DatagramSessionConfig config; + + private final Queue writeBufferQueue; + + private final Queue writeMarkerQueue; + + private final IoHandler handler; + + private final SocketAddress localAddress; + + private final SelectionKey key; + + private SocketAddress remoteAddress; + + private Object attachment; + + private long readBytes; + + private long writtenBytes; + + private long lastReadTime; + + private long lastWriteTime; + + private boolean idleForBoth; + + private boolean idleForRead; + + private boolean idleForWrite; + + /** + * Creates a new instance. + */ + DatagramSession( DatagramProcessor parent, IoHandlerFilterManager filterManager, DatagramChannel ch, + SelectionKey key, IoHandler defaultHandler ) + { + this.parent = parent; + this.filterManager = filterManager; + this.ch = ch; + this.config = new DatagramSessionConfig( ch ); + this.writeBufferQueue = new Queue(); + this.writeMarkerQueue = new Queue(); + this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); + this.key = key; + } + + IoHandlerFilterManager getFilterManager() + { + return filterManager; + } + + DatagramChannel getChannel() + { + return ch; + } + + SelectionKey getSelectionKey() + { + return key; + } + + public IoHandler getHandler() + { + return handler; + } + + public void close() + { + } + + public Object getAttachment() + { + return attachment; + } + + public void setAttachment( Object attachment ) + { + this.attachment = attachment; + } + + Queue getWriteBufferQueue() + { + return writeBufferQueue; + } + + Queue getWriteMarkerQueue() + { + return writeMarkerQueue; + } + + public void write( byte[] buf, int offset, int length, Object marker ) + { + write( ByteBuffer.wrap( buf, offset, length ), marker ); + } + + public void write( byte[] buf, Object marker ) + { + write( ByteBuffer.wrap( buf ), marker ); + } + + public void write( ByteBuffer buf, Object marker ) + { + synchronized( writeBufferQueue ) + { + writeBufferQueue.push( buf ); + writeMarkerQueue.push( marker ); + } + + parent.flushSession( this ); + } + + public boolean isConnected() + { + return ch.isConnected(); + } + + public boolean isClosed() + { + return !isConnected(); + } + + public SessionConfig getConfig() + { + return config; + } + + public SocketAddress getRemoteAddress() + { + return remoteAddress; + } + + void setRemoteAddress(SocketAddress remoteAddress) + { + this.remoteAddress = remoteAddress; + } + + public SocketAddress getLocalAddress() + { + return localAddress; + } + + public long getReadBytes() + { + return readBytes; + } + + public long getWrittenBytes() + { + return writtenBytes; + } + + void increaseReadBytes( int increment ) + { + readBytes += increment; + lastReadTime = System.currentTimeMillis(); + } + + void increaseWrittenBytes( int increment ) + { + writtenBytes += increment; + lastWriteTime = System.currentTimeMillis(); + } + + public long getLastIoTime() + { + return Math.max( lastReadTime, lastWriteTime ); + } + + public long getLastReadTime() + { + return lastReadTime; + } + + public long getLastWriteTime() + { + return lastWriteTime; + } + + public boolean isIdle( IdleStatus status ) + { + if( status == IdleStatus.BOTH_IDLE ) + return idleForBoth; + + if( status == IdleStatus.READER_IDLE ) + return idleForRead; + + if( status == IdleStatus.WRITER_IDLE ) + return idleForWrite; + + throw new IllegalArgumentException( "Unknown idle status: " + status ); + } + + void setIdle( IdleStatus status ) + { + if( status == IdleStatus.BOTH_IDLE ) + idleForBoth = true; + else if( status == IdleStatus.READER_IDLE ) + idleForRead = true; + else if( status == IdleStatus.WRITER_IDLE ) + idleForWrite = true; + else + throw new IllegalArgumentException( "Unknown idle status: " + + status ); + } +} \ No newline at end of file Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java?view=auto&rev=123284 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java Fri Dec 24 00:33:05 2004 @@ -0,0 +1,60 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.io.datagram; + +import java.net.SocketException; +import java.nio.channels.DatagramChannel; + +import org.apache.mina.util.BasicSessionConfig; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$, + */ +public class DatagramSessionConfig extends BasicSessionConfig +{ + private final DatagramChannel ch; + + DatagramSessionConfig( DatagramChannel ch ) + { + this.ch = ch; + } + + public boolean getReuseAddress() throws SocketException + { + return ch.socket().getReuseAddress(); + } + + public void setReuseAddress( boolean on ) throws SocketException + { + ch.socket().setReuseAddress( on ); + } + + public int getTrafficClass() throws SocketException + { + return ch.socket().getTrafficClass(); + } + + public void setTrafficClass( int tc ) throws SocketException + { + ch.socket().setTrafficClass( tc ); + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java Fri Dec 24 00:33:05 2004 @@ -63,10 +63,14 @@ private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; + private boolean started; + private boolean shuttingDown; private int poolSize; + private final Object poolSizeLock = new Object(); + public IoThreadPoolFilter() { } @@ -98,17 +102,25 @@ this.keepAliveTime = keepAliveTime; } - public void init() + public synchronized void start() { + if( started ) + return; + shuttingDown = false; leader = new Worker(); leader.start(); leader.lead(); + + started = true; } - public void destroy() + public synchronized void stop() { + if( !started ) + return; + shuttingDown = true; Worker lastLeader = null; for( ;; ) @@ -131,16 +143,24 @@ lastLeader = leader; } + + started = false; } - private synchronized void increasePoolSize() + private void increasePoolSize() { - poolSize++; + synchronized( poolSizeLock ) + { + poolSize++; + } } - private synchronized void decreasePoolSize() + private void decreasePoolSize() { - poolSize--; + synchronized( poolSizeLock ) + { + poolSize--; + } } public void sessionOpened( IoHandler nextHandler, IoSession session ) Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java Fri Dec 24 00:33:05 2004 @@ -107,7 +107,6 @@ if( worker == null ) { - filterManager.start(); worker = new Worker(); worker.start(); } @@ -175,7 +174,6 @@ if( selector.keys().isEmpty() ) { worker = null; - filterManager.stop(); break; } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java Fri Dec 24 00:33:05 2004 @@ -104,7 +104,6 @@ if( worker == null ) { - filterManager.start(); worker = new Worker(); worker.start(); } @@ -233,7 +232,6 @@ { if( selector.keys().isEmpty() ) { - filterManager.stop(); worker = null; break; } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java Fri Dec 24 00:33:05 2004 @@ -40,12 +40,6 @@ public IoAdapter() { - filterManager.start(); - } - - public void shutdown() - { - filterManager.stop(); } public void addFilter( int priority, ProtocolHandlerFilter filter ) @@ -82,33 +76,29 @@ public void sessionOpened( IoSession session ) { - ProtocolSession psession = new ProtocolSessionImpl( session, this ); - session.setAttachment( psession ); - filterManager.fireSessionOpened( psession ); + filterManager.fireSessionOpened( getProtocolSession( session ) ); } public void sessionClosed( IoSession session ) { - filterManager.fireSessionClosed( ( ProtocolSession ) session - .getAttachment() ); + filterManager.fireSessionClosed( getProtocolSession( session ) ); } public void sessionIdle( IoSession session, IdleStatus status ) { - filterManager.fireSessionIdle( ( ProtocolSession ) session - .getAttachment(), status ); + filterManager.fireSessionIdle( getProtocolSession( session ), + status ); } public void exceptionCaught( IoSession session, Throwable cause ) { - filterManager.fireExceptionCaught( ( ProtocolSession ) session - .getAttachment(), cause ); + filterManager.fireExceptionCaught( getProtocolSession( session ), + cause ); } public void dataRead( IoSession session, ByteBuffer in ) { - ProtocolSession psession = ( ProtocolSession ) session - .getAttachment(); + ProtocolSession psession = getProtocolSession( session ); Object result; try { @@ -176,6 +166,26 @@ { filterManager.fireExceptionCaught( psession, t ); } + } + + private ProtocolSession getProtocolSession( IoSession session ) + { + ProtocolSession psession = ( ProtocolSession ) session + .getAttachment(); + if( psession == null ) + { + synchronized( session ) + { + psession = ( ProtocolSession ) session.getAttachment(); + if( psession == null ) + { + psession = new ProtocolSessionImpl( session, this ); + session.setAttachment( psession ); + } + } + } + + return psession; } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java Fri Dec 24 00:33:05 2004 @@ -28,10 +28,6 @@ */ public interface ProtocolHandlerFilter { - void init(); - - void destroy(); - void sessionOpened( ProtocolHandler nextHandler, ProtocolSession session ); void sessionClosed( ProtocolHandler nextHandler, ProtocolSession session ); Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java Fri Dec 24 00:33:05 2004 @@ -28,14 +28,6 @@ */ public class ProtocolHandlerFilterAdapter implements ProtocolHandlerFilter { - public void init() - { - } - - public void destroy() - { - } - public void sessionOpened( ProtocolHandler nextHandler, ProtocolSession session ) { Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java Fri Dec 24 00:33:05 2004 @@ -62,17 +62,24 @@ private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; + private boolean started; + private boolean shuttingDown; private int poolSize; + private final Object poolSizeLock = new Object(); + public ProtocolThreadPoolFilter() { } - public synchronized int getPoolSize() + public int getPoolSize() { - return poolSize; + synchronized( poolSizeLock ) + { + return poolSize; + } } public int getMaximumPoolSize() @@ -97,17 +104,25 @@ this.keepAliveTime = keepAliveTime; } - public void init() + public synchronized void start() { + if( started ) + return; + shuttingDown = false; leader = new Worker(); leader.start(); leader.lead(); + + started = true; } - public void destroy() + public synchronized void stop() { + if( !started ) + return; + shuttingDown = true; Worker lastLeader = null; for( ;; ) @@ -130,16 +145,24 @@ lastLeader = leader; } + + started = false; } - private synchronized void increasePoolSize() + private void increasePoolSize() { - poolSize++; + synchronized( poolSizeLock ) + { + poolSize++; + } } - private synchronized void decreasePoolSize() + private void decreasePoolSize() { - poolSize--; + synchronized( poolSizeLock ) + { + poolSize--; + } } public void sessionOpened( ProtocolHandler nextHandler, Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Fri Dec 24 00:33:05 2004 @@ -69,60 +69,16 @@ { session.getHandler().dataWritten( session, marker ); } - - public void init() - { - } - - public void destroy() - { - } }; private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER ); - private boolean started; - public IoHandlerFilterManager() { } - public synchronized void start() - { - if( started ) - return; - - Entry e = head; - do - { - e.filter.init(); - e = e.nextEntry; - } - while( e != null ); - started = true; - } - - public synchronized void stop() - { - if( !started ) - return; - - Entry e = head; - do - { - e.filter.destroy(); - e = e.nextEntry; - } - while( e != null ); - } - public synchronized void addFilter( int priority, IoHandlerFilter filter ) { - if( started ) - { - filter.init(); - } - Entry e = head; Entry prevEntry = null; for( ;; ) @@ -178,11 +134,6 @@ else { prevEntry.nextEntry = e.nextEntry; - } - - if( started ) - { - filter.destroy(); } break; } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java Fri Dec 24 00:33:05 2004 @@ -69,61 +69,17 @@ { session.getHandler().messageSent( session, message ); } - - public void init() - { - } - - public void destroy() - { - } }; private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER ); - private boolean started; - public ProtocolHandlerFilterManager() { } - public synchronized void start() - { - if( started ) - return; - - Entry e = head; - do - { - e.filter.init(); - e = e.nextEntry; - } - while( e != null ); - started = true; - } - - public synchronized void stop() - { - if( !started ) - return; - - Entry e = head; - do - { - e.filter.destroy(); - e = e.nextEntry; - } - while( e != null ); - } - public synchronized void addFilter( int priority, ProtocolHandlerFilter filter ) { - if( started ) - { - filter.init(); - } - Entry e = head; Entry prevEntry = null; for( ;; ) @@ -179,11 +135,6 @@ else { prevEntry.nextEntry = e.nextEntry; - } - - if( started ) - { - filter.destroy(); } break; } Modified: incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java&r2=123284 ============================================================================== --- incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java (original) +++ incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java Fri Dec 24 00:33:05 2004 @@ -3,14 +3,16 @@ */ package org.apache.mina.examples.echoserver; -import java.io.InputStream; -import java.io.OutputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.Socket; +import java.net.SocketTimeoutException; import junit.framework.TestCase; +import org.apache.commons.net.EchoTCPClient; +import org.apache.commons.net.EchoUDPClient; import org.apache.mina.io.Acceptor; +import org.apache.mina.io.datagram.DatagramAcceptor; import org.apache.mina.io.filter.IoThreadPoolFilter; import org.apache.mina.io.socket.SocketAcceptor; @@ -26,39 +28,143 @@ private Acceptor acceptor; + private Acceptor datagramAcceptor; + + private IoThreadPoolFilter threadPoolFilter; + + public static void assertEquals( byte[] expected, byte[] actual ) + { + assertEquals( toString( expected ), toString( actual ) ); + } + + private static String toString( byte[] buf ) + { + StringBuffer str = new StringBuffer( buf.length * 4 ); + for( int i = 0; i < buf.length; i++ ) + { + str.append( buf[ i ] ); + str.append( ' ' ); + } + return str.toString(); + } + protected void setUp() throws Exception { + threadPoolFilter = new IoThreadPoolFilter(); + threadPoolFilter.start(); + acceptor = new SocketAcceptor(); acceptor.bind( new InetSocketAddress( PORT ), new EchoProtocolHandler() ); - acceptor.addFilter( Integer.MAX_VALUE, new IoThreadPoolFilter() ); + acceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter ); + + datagramAcceptor = new DatagramAcceptor(); + datagramAcceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter ); + datagramAcceptor.bind( new InetSocketAddress( PORT ), + new EchoProtocolHandler() ); } protected void tearDown() throws Exception { acceptor.unbind( new InetSocketAddress( PORT ) ); + datagramAcceptor.unbind( new InetSocketAddress( PORT ) ); + threadPoolFilter.stop(); } - public void testPool() throws Exception + public void testTCP() throws Exception { - Socket s = new Socket( "localhost", PORT ); - InputStream in = s.getInputStream(); - OutputStream out = s.getOutputStream(); - try + EchoTCPClient client = new EchoTCPClient(); + client.connect( InetAddress.getLocalHost(), PORT ); + client.setSoTimeout( 3000 ); + + byte[] writeBuf = new byte[ 16 ]; + + for( int i = 0; i < 10; i++ ) { - for( int i = 0; i < 1024; i++ ) + fillWriteBuffer( writeBuf, i ); + client.getOutputStream().write( writeBuf ); + } + + byte[] readBuf = new byte[ writeBuf.length ]; + + for( int i = 0; i < 10; i++ ) + { + fillWriteBuffer( writeBuf, i ); + + int readBytes = 0; + while( readBytes < readBuf.length ) { - int b = ( ( byte ) i ) & 0xff; - System.out.println( "Test: " + b ); - out.write( b ); - assertEquals( b, in.read() ); + int nBytes = client.getInputStream() + .read( readBuf, readBytes, readBuf.length - readBytes ); + + if( nBytes < 0 ) + fail( "Unexpected disconnection." ); + + readBytes += nBytes; } + + assertEquals( writeBuf, readBuf ); + } + + client.setSoTimeout( 500 ); + + try + { + client.getInputStream().read(); + fail( "Unexpected incoming data." ); + } + catch( SocketTimeoutException e ) + { + } + + client.disconnect(); + } + + public void testUDP() throws Exception + { + EchoUDPClient client = new EchoUDPClient(); + client.open(); + client.setSoTimeout( 3000 ); + + byte[] writeBuf = new byte[ 16 ]; + + for( int i = 0; i < 10; i++ ) + { + fillWriteBuffer( writeBuf, i ); + client.send( writeBuf, writeBuf.length, + InetAddress.getLocalHost(), PORT ); } - finally + + byte[] readBuf = new byte[ writeBuf.length ]; + + for( int i = 0; i < 10; i++ ) + { + fillWriteBuffer( writeBuf, i ); + + assertEquals( readBuf.length, client.receive( readBuf, + readBuf.length ) ); + assertEquals( writeBuf, readBuf ); + } + + client.setSoTimeout( 500 ); + + try + { + client.receive( readBuf ); + fail( "Unexpected incoming data." ); + } + catch( SocketTimeoutException e ) + { + } + + client.close(); + } + + private void fillWriteBuffer( byte[] writeBuf, int i ) + { + for( int j = writeBuf.length - 1; j >= 0; j-- ) { - in.close(); - out.close(); - s.close(); + writeBuf[ j ] = ( byte ) ( j + i ); } }
