Author: trustin Date: Fri Dec 24 00:57:32 2004 New Revision: 123285 URL: http://svn.apache.org/viewcvs?view=rev&rev=123285 Log: * Added DatagramConnector Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java (contents, props changed) Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java
Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java?view=diff&rev=123285&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java&r1=123284&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java&r2=123285 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java Fri Dec 24 00:57:32 2004 @@ -144,6 +144,10 @@ scheduleFlush( session ); selector.wakeup(); } + + public void closeSession( DatagramSession session ) + { + } private void scheduleFlush( DatagramSession session ) { @@ -182,12 +186,12 @@ } } - cancelKeys(); - if( nKeys > 0 ) { processReadySessions( selector.selectedKeys() ); } + + cancelKeys(); flushSessions(); } catch( IOException e ) @@ -221,9 +225,9 @@ DatagramAcceptor.this, filterManager, ch, - key, ( IoHandler ) key .attachment() ); + session.setSelectionKey(key); if( key.isReadable() ) { Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java?view=auto&rev=123285 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java Fri Dec 24 00:57:32 2004 @@ -0,0 +1,447 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.io.datagram; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Set; + +import org.apache.mina.io.Connector; +import org.apache.mina.io.ExceptionMonitor; +import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoHandlerFilter; +import org.apache.mina.io.IoSession; +import org.apache.mina.util.DefaultExceptionMonitor; +import org.apache.mina.util.IoHandlerFilterManager; +import org.apache.mina.util.Queue; + +/** + * TODO Insert type comment. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class DatagramConnector implements Connector, DatagramProcessor +{ + private static volatile int nextId = 0; + + private final IoHandlerFilterManager filterManager = new IoHandlerFilterManager(); + + private final int id = nextId++; + + private final Selector selector; + + private final Queue registerQueue = new Queue(); + + private final Queue cancelQueue = new Queue(); + + private final Queue flushingSessions = new Queue(); + + private ExceptionMonitor exceptionMonitor = new DefaultExceptionMonitor(); + + private Worker worker; + + /** + * Creates a new instance. + * + * @throws IOException + */ + public DatagramConnector() throws IOException + { + selector = Selector.open(); + } + + public IoSession connect( SocketAddress address, IoHandler defaultHandler ) + throws IOException + { + if( address == null ) + throw new NullPointerException( "address" ); + if( defaultHandler == null ) + throw new NullPointerException( "defaultHandler" ); + + if( ! ( address instanceof InetSocketAddress ) ) + throw new IllegalArgumentException( "Unexpected address type: " + + address.getClass() ); + + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking( false ); + ch.socket().connect( address ); + + RegistrationRequest request = new RegistrationRequest( ch, + defaultHandler ); + synchronized( this ) + { + synchronized( registerQueue ) + { + registerQueue.push( request ); + } + + if( worker == null ) + { + worker = new Worker(); + worker.start(); + } + } + + selector.wakeup(); + + synchronized( request ) + { + while( request.session == null ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + } + } + } + + return request.session; + } + + public IoSession connect( SocketAddress address, int timeout, + IoHandler defaultHandler ) throws IOException + { + return connect( address, defaultHandler ); + } + + public void closeSession( DatagramSession session ) + { + synchronized( this ) + { + SelectionKey key = session.getSelectionKey(); + synchronized( cancelQueue ) + { + cancelQueue.push( key ); + } + } + + selector.wakeup(); + } + + public void flushSession( DatagramSession session ) + { + scheduleFlush( session ); + selector.wakeup(); + } + + private void scheduleFlush( DatagramSession session ) + { + synchronized( flushingSessions ) + { + flushingSessions.push( session ); + } + } + + private class Worker extends Thread + { + public Worker() + { + super( "DatagramAcceptor-" + id ); + } + + public void run() + { + for( ;; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + + if( selector.keys().isEmpty() ) + { + synchronized( DatagramConnector.this ) + { + if( selector.keys().isEmpty() ) + { + worker = null; + break; + } + } + } + + if( nKeys > 0 ) + { + processReadySessions( selector.selectedKeys() ); + } + + flushSessions(); + cancelKeys(); + } + catch( IOException e ) + { + exceptionMonitor.exceptionCaught( DatagramConnector.this, + e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + } + } + } + } + } + + private void processReadySessions( Set keys ) + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + it.remove(); + + DatagramSession session = ( DatagramSession ) key.attachment(); + + if( key.isReadable() ) + { + readSession( session ); + } + + if( key.isWritable() ) + { + scheduleFlush( session ); + } + } + } + + private void readSession( DatagramSession session ) + { + + ByteBuffer readBuf = ByteBuffer.allocate( 1500 ); + try + { + int readBytes = session.getChannel().read( readBuf ); + if( readBytes > 0 ) + { + readBuf.flip(); + filterManager.fireDataRead( session, readBuf ); + } + } + catch( IOException e ) + { + filterManager.fireExceptionCaught( session, e ); + } + } + + private void flushSessions() + { + if( flushingSessions.size() == 0 ) + return; + + for( ;; ) + { + DatagramSession session; + + synchronized( flushingSessions ) + { + session = ( DatagramSession ) flushingSessions.pop(); + } + + if( session == null ) + break; + + try + { + flush( session ); + } + catch( IOException e ) + { + session.getFilterManager().fireExceptionCaught( session, e ); + } + } + } + + private void flush( DatagramSession session ) throws IOException + { + DatagramChannel ch = session.getChannel(); + + Queue writeBufferQueue = session.getWriteBufferQueue(); + Queue writeMarkerQueue = session.getWriteMarkerQueue(); + + ByteBuffer buf; + Object marker; + for( ;; ) + { + synchronized( writeBufferQueue ) + { + buf = ( ByteBuffer ) writeBufferQueue.first(); + marker = writeMarkerQueue.first(); + } + + if( buf == null ) + break; + + if( buf.remaining() == 0 ) + { + // pop and fire event + synchronized( writeBufferQueue ) + { + writeBufferQueue.pop(); + writeMarkerQueue.pop(); + } + session.getFilterManager().fireDataWritten( session, marker ); + continue; + } + + int writtenBytes = ch.write( buf ); + + SelectionKey key = session.getSelectionKey(); + if( writtenBytes == 0 ) + { + // Kernel buffer is full + key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); + } + else + { + key + .interestOps( key.interestOps() + & ( ~SelectionKey.OP_WRITE ) ); + + // pop and fire event + synchronized( writeBufferQueue ) + { + writeBufferQueue.pop(); + writeMarkerQueue.pop(); + } + session.getFilterManager().fireDataWritten( session, marker ); + } + } + } + + private void registerNew() throws ClosedChannelException + { + if( registerQueue.isEmpty() ) + return; + + for( ;; ) + { + RegistrationRequest req; + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + break; + + DatagramSession session = new DatagramSession( this, + filterManager, + req.channel, + req.handler ); + + SelectionKey key = req.channel.register( selector, + SelectionKey.OP_READ, + session ); + + session.setSelectionKey( key ); + + synchronized( req ) + { + req.session = session; + req.notify(); + } + } + } + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + return; + + for( ;; ) + { + SelectionKey key; + synchronized( cancelQueue ) + { + key = ( SelectionKey ) cancelQueue.pop(); + } + + if( key == null ) + break; + else + { + try + { + key.channel().close(); + } + catch( IOException e ) + { + exceptionMonitor.exceptionCaught( this, e ); + } + key.cancel(); + selector.wakeup(); // wake up again to trigger thread death + } + } + } + + public void addFilter( int priority, IoHandlerFilter filter ) + { + filterManager.addFilter( priority, filter ); + } + + public void removeFilter( IoHandlerFilter filter ) + { + filterManager.removeFilter( filter ); + } + + private static class RegistrationRequest + { + private final DatagramChannel channel; + + private final IoHandler handler; + + private DatagramSession session; + + private RegistrationRequest( DatagramChannel channel, IoHandler handler ) + { + this.channel = channel; + this.handler = handler; + } + } + + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + + public void setExceptionMonitor( ExceptionMonitor monitor ) + { + if( monitor == null ) + { + monitor = new DefaultExceptionMonitor(); + } + + this.exceptionMonitor = monitor; + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java?view=diff&rev=123285&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java&r1=123284&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java&r2=123285 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java Fri Dec 24 00:57:32 2004 @@ -27,4 +27,6 @@ public interface DatagramProcessor { void flushSession( DatagramSession session ); + + void closeSession( DatagramSession session ); } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java?view=diff&rev=123285&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java&r1=123284&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java&r2=123285 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Fri Dec 24 00:57:32 2004 @@ -54,10 +54,10 @@ private final SocketAddress localAddress; - private final SelectionKey key; - private SocketAddress remoteAddress; + private SelectionKey key; + private Object attachment; private long readBytes; @@ -77,8 +77,9 @@ /** * Creates a new instance. */ - DatagramSession( DatagramProcessor parent, IoHandlerFilterManager filterManager, DatagramChannel ch, - SelectionKey key, IoHandler defaultHandler ) + DatagramSession( DatagramProcessor parent, + IoHandlerFilterManager filterManager, DatagramChannel ch, + IoHandler defaultHandler ) { this.parent = parent; this.filterManager = filterManager; @@ -89,7 +90,6 @@ this.handler = defaultHandler; this.remoteAddress = ch.socket().getRemoteSocketAddress(); this.localAddress = ch.socket().getLocalSocketAddress(); - this.key = key; } IoHandlerFilterManager getFilterManager() @@ -107,6 +107,11 @@ return key; } + void setSelectionKey( SelectionKey key ) + { + this.key = key; + } + public IoHandler getHandler() { return handler; @@ -176,8 +181,8 @@ { return remoteAddress; } - - void setRemoteAddress(SocketAddress remoteAddress) + + void setRemoteAddress( SocketAddress remoteAddress ) { this.remoteAddress = remoteAddress; }
