Author: trustin
Date: Thu Sep 27 01:32:13 2007
New Revision: 579942

URL: http://svn.apache.org/viewvc?rev=579942&view=rev
Log:
* Reverted DatagramAcceptor because my experimental implementation behaves 
unpredictably on some platforms.

Added:
    
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java 
  (with props)
    mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java  
 (with props)
    mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java   (with 
props)
    
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
   (with props)
Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java

Added: 
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java?rev=579942&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
 Thu Sep 27 01:32:13 2007
@@ -0,0 +1,115 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.util.ExpirationListener;
+import org.apache.mina.util.ExpiringMap;
+
+/**
+ * An [EMAIL PROTECTED] IoSessionRecycler} with sessions that time out on 
inactivity.
+ *
+ * TODO Document me.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class ExpiringSessionRecycler implements IoSessionRecycler {
+    private ExpiringMap<Object, IoSession> sessionMap;
+
+    private ExpiringMap<Object, IoSession>.Expirer mapExpirer;
+
+    public ExpiringSessionRecycler() {
+        this(ExpiringMap.DEFAULT_TIME_TO_LIVE);
+    }
+
+    public ExpiringSessionRecycler(int timeToLive) {
+        this(timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL);
+    }
+
+    public ExpiringSessionRecycler(int timeToLive, int expirationInterval) {
+        sessionMap = new ExpiringMap<Object, IoSession>(timeToLive,
+                expirationInterval);
+        mapExpirer = sessionMap.getExpirer();
+        sessionMap.addExpirationListener(new DefaultExpirationListener());
+    }
+
+    public void put(IoSession session) {
+        mapExpirer.startExpiringIfNotStarted();
+
+        Object key = generateKey(session);
+
+        if (!sessionMap.containsKey(key)) {
+            sessionMap.put(key, session);
+        }
+    }
+
+    public IoSession recycle(SocketAddress localAddress,
+            SocketAddress remoteAddress) {
+        return sessionMap.get(generateKey(localAddress, remoteAddress));
+    }
+
+    public void remove(IoSession session) {
+        sessionMap.remove(generateKey(session));
+    }
+
+    public void stopExpiring() {
+        mapExpirer.stopExpiring();
+    }
+
+    public int getExpirationInterval() {
+        return sessionMap.getExpirationInterval();
+    }
+
+    public int getTimeToLive() {
+        return sessionMap.getTimeToLive();
+    }
+
+    public void setExpirationInterval(int expirationInterval) {
+        sessionMap.setExpirationInterval(expirationInterval);
+    }
+
+    public void setTimeToLive(int timeToLive) {
+        sessionMap.setTimeToLive(timeToLive);
+    }
+
+    private Object generateKey(IoSession session) {
+        return generateKey(session.getLocalAddress(), session
+                .getRemoteAddress());
+    }
+
+    private Object generateKey(SocketAddress localAddress,
+            SocketAddress remoteAddress) {
+        List<SocketAddress> key = new ArrayList<SocketAddress>(2);
+        key.add(remoteAddress);
+        key.add(localAddress);
+        return key;
+    }
+
+    private class DefaultExpirationListener implements
+            ExpirationListener<IoSession> {
+        public void expired(IoSession expiredSession) {
+            expiredSession.close();
+        }
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java?rev=579942&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java 
(added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java 
Thu Sep 27 01:32:13 2007
@@ -0,0 +1,78 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+
+/**
+ * A connectionless transport can recycle existing sessions by assigning an
+ * [EMAIL PROTECTED] IoSessionRecycler} to an [EMAIL PROTECTED] IoService}.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * TODO More documentation
+ */
+public interface IoSessionRecycler {
+    /**
+     * A dummy recycler that doesn't recycle any sessions.  Using this 
recycler will
+     * make all session lifecycle events to be fired for every I/O for all 
connectionless
+     * sessions.
+     */
+    static IoSessionRecycler NOOP = new IoSessionRecycler() {
+        public void put(IoSession session) {
+        }
+
+        public IoSession recycle(SocketAddress localAddress,
+                SocketAddress remoteAddress) {
+            return null;
+        }
+
+        public void remove(IoSession session) {
+        }
+    };
+
+    /**
+     * Called when the underlying transport creates or writes a new [EMAIL 
PROTECTED] IoSession}.
+     *
+     * @param session
+     *            the new [EMAIL PROTECTED] IoSession}.
+     */
+    void put(IoSession session);
+
+    /**
+     * Attempts to retrieve a recycled [EMAIL PROTECTED] IoSession}.
+     *
+     * @param localAddress
+     *            the local socket address of the [EMAIL PROTECTED] IoSession} 
the
+     *            transport wants to recycle.
+     * @param remoteAddress
+     *            the remote socket address of the [EMAIL PROTECTED] 
IoSession} the
+     *            transport wants to recycle.
+     * @return a recycled [EMAIL PROTECTED] IoSession}, or null if one cannot 
be found.
+     */
+    IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
+
+    /**
+     * Called when an [EMAIL PROTECTED] IoSession} is explicitly closed.
+     *
+     * @param session
+     *            the new [EMAIL PROTECTED] IoSession}.
+     */
+    void remove(IoSession session);
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
 Thu Sep 27 01:32:13 2007
@@ -28,22 +28,21 @@
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.AbstractIoAcceptor;
 import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.ExpiringSessionRecycler;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoProcessor;
 import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.RuntimeIoException;
 import org.apache.mina.common.TransportMetadata;
+import org.apache.mina.common.WriteRequest;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
 import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
 import org.apache.mina.util.NamePreservingRunnable;
@@ -57,27 +56,27 @@
  */
 public class DatagramAcceptor extends AbstractIoAcceptor implements
         org.apache.mina.transport.socket.DatagramAcceptor {
+    private static final IoSessionRecycler DEFAULT_RECYCLER = new 
ExpiringSessionRecycler();
 
     private static volatile int nextId = 0;
 
+    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
+
     private final Executor executor;
 
     private final int id = nextId++;
 
     private final Selector selector;
-    
-    private final DatagramConnector connector;
+
+    private final IoProcessor processor = new DatagramAcceptorProcessor();
 
     private DatagramChannel channel;
 
-    private final Queue<ServiceOperationFuture> registerQueue =
-        new ConcurrentLinkedQueue<ServiceOperationFuture>();
+    private final Queue<ServiceOperationFuture> registerQueue = new 
ConcurrentLinkedQueue<ServiceOperationFuture>();
 
-    private final Queue<ServiceOperationFuture> cancelQueue =
-        new ConcurrentLinkedQueue<ServiceOperationFuture>();
-    
-    private final ConcurrentMap<SocketAddress, Object> cache =
-        new ConcurrentHashMap<SocketAddress, Object>();
+    private final Queue<ServiceOperationFuture> cancelQueue = new 
ConcurrentLinkedQueue<ServiceOperationFuture>();
+
+    private final Queue<DatagramSessionImpl> flushingSessions = new 
ConcurrentLinkedQueue<DatagramSessionImpl>();
 
     private Worker worker;
 
@@ -92,13 +91,6 @@
      * Creates a new instance.
      */
     public DatagramAcceptor(Executor executor) {
-        this(Runtime.getRuntime().availableProcessors() + 1, executor);
-    }
-
-    /**
-     * Creates a new instance.
-     */
-    public DatagramAcceptor(int processorCount, Executor executor) {
         super(new DefaultDatagramSessionConfig());
 
         try {
@@ -108,11 +100,6 @@
         }
 
         this.executor = executor;
-        this.connector = new DatagramConnector(
-                this, "DatagramAcceptor-" + id, processorCount, executor);
-
-        // The default reuseAddress should be 'true' for an accepted socket.
-        getSessionConfig().setReuseAddress(true);
     }
 
     @Override
@@ -138,51 +125,40 @@
     public InetSocketAddress getLocalAddress() {
         return (InetSocketAddress) super.getLocalAddress();
     }
-
-    // This method is added to work around a problem with
-    // bean property access mechanism.
-
-    /**
-     * @see 
org.apache.mina.common.AbstractIoAcceptor#setLocalAddress(java.net.SocketAddress)
-     * @param localAddress the local address
-     */
-    public void setLocalAddress(InetSocketAddress localAddress) {
-        super.setLocalAddress(localAddress);
-    }
     
-    @Override
-    protected IoServiceListenerSupport getListeners() {
-        return super.getListeners();
+    public void setLocalAddress(InetSocketAddress localAddress) {
+        setLocalAddress((SocketAddress) localAddress);
     }
 
     @Override
     protected void doBind() throws Exception {
-        ServiceOperationFuture future = new ServiceOperationFuture();
+        ServiceOperationFuture request = new ServiceOperationFuture();
 
-        registerQueue.add(future);
+        registerQueue.add(request);
         startupWorker();
         selector.wakeup();
 
-        future.awaitUninterruptibly();
+        request.awaitUninterruptibly();
 
-        if (future.getException() != null) {
-            throw future.getException();
+        if (request.getException() != null) {
+            throw request.getException();
         }
-
+        
         setLocalAddress(channel.socket().getLocalSocketAddress());
     }
 
     @Override
     protected void doUnbind() throws Exception {
-        ServiceOperationFuture future = new ServiceOperationFuture();
+        ServiceOperationFuture request = new ServiceOperationFuture();
 
-        cancelQueue.add(future);
+        cancelQueue.add(request);
         startupWorker();
         selector.wakeup();
 
-        future.awaitUninterruptibly();
-        if (future.getException() != null) {
-            throw future.getException();
+        request.awaitUninterruptibly();
+        
+        if (request.getException() != null) {
+            throw request.getException();
         }
     }
 
@@ -196,28 +172,98 @@
                 throw new IllegalStateException(
                         "Can't create a session from a unbound service.");
             }
-            
-            Object data;
-            synchronized (cache) { 
-                data = cache.get(remoteAddress);
-                if (data == null) {
-                    ConnectFuture future = connector.connect(remoteAddress, 
getLocalAddress());
-                    cache.put(remoteAddress, future);
-                    future.awaitUninterruptibly();
-                    return future.getSession();
-                }
-            }
-            
-            if (data instanceof ConnectFuture) {
-                ConnectFuture future = (ConnectFuture) data;
-                future.awaitUninterruptibly();
-                return future.getSession();
-            } else if (data instanceof IoSession) {
-                return ((IoSession) data);
-            } else {
-                throw new IllegalStateException();
+
+            return newSessionWithoutLock(remoteAddress);
+        }
+    }
+
+    private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
+        Selector selector = this.selector;
+        DatagramChannel ch = this.channel;
+        SelectionKey key = ch.keyFor(selector);
+
+        IoSession session;
+        IoSessionRecycler sessionRecycler = getSessionRecycler();
+        synchronized (sessionRecycler) {
+            session = sessionRecycler.recycle(getLocalAddress(), 
remoteAddress);
+            if (session != null) {
+                return session;
+            }
+
+            // If a new session needs to be created.
+            DatagramSessionImpl datagramSession = new DatagramSessionImpl(
+                    this, ch, processor, remoteAddress);
+            datagramSession.setSelectionKey(key);
+
+            getSessionRecycler().put(datagramSession);
+            session = datagramSession;
+        }
+
+        try {
+            
this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            getListeners().fireSessionCreated(session);
+        } catch (Throwable t) {
+            ExceptionMonitor.getInstance().exceptionCaught(t);
+        }
+
+        return session;
+    }
+
+    /**
+     * Returns the [EMAIL PROTECTED] IoSessionRecycler} for this service.
+     */
+    public IoSessionRecycler getSessionRecycler() {
+        return sessionRecycler;
+    }
+
+    /**
+     * Sets the [EMAIL PROTECTED] IoSessionRecycler} for this service.
+     *
+     * @param sessionRecycler <tt>null</tt> to use the default recycler
+     */
+    public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
+        synchronized (bindLock) {
+            if (isBound()) {
+                throw new IllegalStateException(
+                        "sessionRecycler can't be set while the acceptor is 
bound.");
+            }
+
+            if (sessionRecycler == null) {
+                sessionRecycler = DEFAULT_RECYCLER;
+            }
+            this.sessionRecycler = sessionRecycler;
+        }
+    }
+
+    @Override
+    protected IoServiceListenerSupport getListeners() {
+        return super.getListeners();
+    }
+
+    IoProcessor getProcessor() {
+        return processor;
+    }
+
+    private class DatagramAcceptorProcessor implements IoProcessor {
+
+        public void add(IoSession session) {
+        }
+
+        public void flush(IoSession session) {
+            if (scheduleFlush((DatagramSessionImpl) session)) {
+                Selector selector = DatagramAcceptor.this.selector;
+                if (selector != null) {
+                    selector.wakeup();
+                }
             }
         }
+
+        public void remove(IoSession session) {
+            getListeners().fireSessionDestroyed(session);
+        }
+
+        public void updateTrafficMask(IoSession session) {
+        }
     }
 
     private synchronized void startupWorker() {
@@ -227,6 +273,15 @@
         }
     }
 
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     private class Worker implements Runnable {
         public void run() {
             Thread.currentThread().setName("DatagramAcceptor-" + id);
@@ -241,6 +296,7 @@
                         processReadySessions(selector.selectedKeys());
                     }
 
+                    flushSessions();
                     cancelKeys();
 
                     if (selector.keys().isEmpty()) {
@@ -277,6 +333,12 @@
                 if (key.isReadable()) {
                     readSession(ch);
                 }
+
+                if (key.isWritable()) {
+                    for (IoSession session : getManagedSessions()) {
+                        scheduleFlush((DatagramSessionImpl) session);
+                    }
+                }
             } catch (Throwable t) {
                 ExceptionMonitor.getInstance().exceptionCaught(t);
             }
@@ -284,55 +346,104 @@
     }
 
     private void readSession(DatagramChannel channel) throws Exception {
-        final ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
-                .getReadBufferSize());
+        ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
+                .getReceiveBufferSize());
 
-        final SocketAddress remoteAddress = channel.receive(readBuf.buf());
+        SocketAddress remoteAddress = channel.receive(readBuf.buf());
         if (remoteAddress != null) {
+            DatagramSessionImpl session = (DatagramSessionImpl) 
newSessionWithoutLock(remoteAddress);
+
             readBuf.flip();
-            Object data;
-            ConnectFuture future = null;
-            synchronized (cache) {
-                data = cache.get(remoteAddress);
-                if (data == null) {
-                    future = connector.connect(remoteAddress, 
getLocalAddress());
-                    cache.put(remoteAddress, future);
-                }
-            }
-            
-            if (data == null) {
-                future.addListener(new IoFutureListener() {
-                    public void operationComplete(IoFuture future) {
-                        ConnectFuture f = (ConnectFuture) future;
-                        if (f.getException() == null) {
-                            IoSession s = f.getSession();
-                            cache.put(remoteAddress, s);
-                            s.getCloseFuture().addListener(new 
IoFutureListener() {
-                                public void operationComplete(IoFuture future) 
{
-                                    cache.remove(remoteAddress);
-                                }
-                            });
-                            s.getFilterChain().fireMessageReceived(readBuf);
-                        } else {
-                            
ExceptionMonitor.getInstance().exceptionCaught(f.getException());
-                        }
-                    }
-                });
-            } else if (data instanceof ConnectFuture) {
-                future = (ConnectFuture) data;
-                future.addListener(new IoFutureListener() {
-                    public void operationComplete(IoFuture future) {
-                        ConnectFuture f = (ConnectFuture) future;
-                        if (f.getException() == null) {
-                            IoSession s = f.getSession();
-                            s.getFilterChain().fireMessageReceived(readBuf);
-                        }
-                    }
-                });
-            } else if (data instanceof IoSession) {
-                ((IoSession) 
data).getFilterChain().fireMessageReceived(readBuf);
+
+            ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
+            newBuf.put(readBuf);
+            newBuf.flip();
+
+            session.increaseReadBytes(newBuf.remaining());
+            session.getFilterChain().fireMessageReceived(newBuf);
+        }
+    }
+
+    private void flushSessions() {
+        for (; ;) {
+            DatagramSessionImpl session = flushingSessions.poll();
+            if (session == null) {
+                break;
+            }
+
+            session.setScheduledForFlush(false);
+
+            try {
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && 
!session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
+            } catch (IOException e) {
+                session.getFilterChain().fireExceptionCaught(e);
+            }
+        }
+    }
+
+    private boolean flush(DatagramSessionImpl session) throws IOException {
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return false;
+        }
+        if (!key.isValid()) {
+            return false;
+        }
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+        DatagramChannel ch = session.getChannel();
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+        int writtenBytes = 0;
+        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+        try {
+            for (; ;) {
+                WriteRequest req = writeRequestQueue.peek();
+                if (req == null) {
+                    break;
+                }
+
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    writeRequestQueue.poll();
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(req);
+                    continue;
+                }
+
+                SocketAddress destination = req.getDestination();
+                if (destination == null) {
+                    destination = session.getRemoteAddress();
+                }
+
+                int localWrittenBytes = ch.send(buf.buf(), destination);
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) 
{
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    return false;
+                } else {
+                    key.interestOps(key.interestOps() & 
~SelectionKey.OP_WRITE);
+
+                    // pop and fire event
+                    writeRequestQueue.poll();
+                    writtenBytes += localWrittenBytes;
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(req);
+                }
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
+
+        return true;
     }
 
     private void registerNew() {
@@ -341,8 +452,8 @@
         }
 
         for (; ;) {
-            ServiceOperationFuture future = registerQueue.poll();
-            if (future == null) {
+            ServiceOperationFuture req = registerQueue.poll();
+            if (req == null) {
                 break;
             }
 
@@ -361,14 +472,15 @@
 
                 ch.configureBlocking(false);
                 ch.socket().bind(getLocalAddress());
-                ch.register(selector, SelectionKey.OP_READ, future);
+                ch.register(selector, SelectionKey.OP_READ, req);
                 this.channel = ch;
 
-                future.setDone();
+                getListeners().fireServiceActivated();
+                req.setDone();
             } catch (Exception e) {
-                future.setException(e);
+                req.setException(e);
             } finally {
-                if (ch != null && future.getException() != null) {
+                if (ch != null && req.getException() != null) {
                     try {
                         ch.disconnect();
                         ch.close();
@@ -382,8 +494,8 @@
 
     private void cancelKeys() {
         for (; ;) {
-            ServiceOperationFuture future = cancelQueue.poll();
-            if (future == null) {
+            ServiceOperationFuture request = cancelQueue.poll();
+            if (request == null) {
                 break;
             }
 
@@ -400,8 +512,8 @@
             } catch (Throwable t) {
                 ExceptionMonitor.getInstance().exceptionCaught(t);
             } finally {
-                future.setDone();
                 getListeners().fireServiceDeactivated();
+                request.setDone();
             }
         }
     }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
 Thu Sep 27 01:32:13 2007
@@ -28,13 +28,8 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
 import org.apache.mina.common.DefaultIoFilterChain;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -53,7 +48,7 @@
     private static volatile int nextId = 0;
 
     private final int id = nextId++;
-    private final IoService parent;
+    private final String threadName = "DatagramConnector-" + id;
     private final int processorCount;
     private final NioProcessor[] ioProcessors;
 
@@ -77,22 +72,8 @@
      * Creates a new instance.
      */
     public DatagramConnector(int processorCount, Executor executor) {
-        this(null, null, processorCount, executor);
-    }
-    
-    DatagramConnector(
-            IoService parent, String threadNamePrefix, int processorCount, 
Executor executor) {
         super(new DefaultDatagramSessionConfig());
         
-        // DotagramAcceptor can use DatagramConnector as a child.
-        if (parent == null) {
-            parent = this;
-        }
-        if (threadNamePrefix == null) {
-            threadNamePrefix = "DatagramConnector-" + id;
-        }
-        this.parent = parent;
-        
         if (processorCount < 1) {
             throw new IllegalArgumentException(
                     "Must have at least one processor");
@@ -105,7 +86,7 @@
         // handling sessions.
         for (int i = 0; i < processorCount; i++) {
             ioProcessors[i] = new NioProcessor(
-                    threadNamePrefix + '.' + i, executor);
+                    threadName + '.' + i, executor);
         }
     }
 
@@ -122,66 +103,8 @@
     }
 
     @Override
-    protected IoServiceListenerSupport getListeners() {
-        if (parent == this) {
-            return super.getListeners();
-        } else {
-            return ((DatagramAcceptor) parent).getListeners();
-        }
-    }
-
-    @Override
     public DatagramSessionConfig getSessionConfig() {
-        if (parent == this) {
-            return (DatagramSessionConfig) super.getSessionConfig();
-        } else {
-            return (DatagramSessionConfig) parent.getSessionConfig();
-        }
-    }
-
-    @Override
-    public DefaultIoFilterChainBuilder getFilterChain() {
-        if (parent == this) {
-            return super.getFilterChain();
-        } else {
-            return parent.getFilterChain();
-        }
-    }
-
-    @Override
-    public IoFilterChainBuilder getFilterChainBuilder() {
-        if (parent == this) {
-            return super.getFilterChainBuilder();
-        } else {
-            return parent.getFilterChainBuilder();
-        }
-    }
-
-    @Override
-    public void setFilterChainBuilder(IoFilterChainBuilder builder) {
-        if (parent == this) {
-            super.setFilterChainBuilder(builder);
-        } else {
-            parent.setFilterChainBuilder(builder);
-        }
-    }
-
-    @Override
-    public IoHandler getHandler() {
-        if (parent == this) {
-            return super.getHandler();
-        } else {
-            return parent.getHandler();
-        }
-    }
-
-    @Override
-    public void setHandler(IoHandler handler) {
-        if (parent == this) {
-            super.setHandler(handler);
-        } else {
-            parent.setHandler(handler);
-        }
+        return (DatagramSessionConfig) super.getSessionConfig();
     }
 
     @Override
@@ -202,7 +125,7 @@
             ch.connect(remoteAddress);
 
             NioProcessor processor = nextProcessor();
-            session = new DatagramSessionImpl(parent, ch, processor);
+            session = new DatagramSessionImpl(this, ch, processor);
             ConnectFuture future = new DefaultConnectFuture();
             // DefaultIoFilterChain will notify the connect future.
             session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
 Thu Sep 27 01:32:13 2007
@@ -74,20 +74,28 @@
     private SelectionKey key;
 
     /**
-     * Creates a new connector instance.
+     * Creates a new acceptor-side session instance.
      */
     DatagramSessionImpl(IoService service,
-                        DatagramChannel ch, IoProcessor processor) {
+                        DatagramChannel ch, IoProcessor processor,
+                        SocketAddress remoteAddress) {
         this.service = service;
         this.ch = ch;
         this.handler = service.getHandler();
         this.processor = processor;
-        this.remoteAddress = (InetSocketAddress) ch.socket()
-                .getRemoteSocketAddress();
+        this.remoteAddress = (InetSocketAddress) remoteAddress;
         this.localAddress = (InetSocketAddress) ch.socket()
                 .getLocalSocketAddress();
 
         this.config.setAll(service.getSessionConfig());
+    }
+
+    /**
+     * Creates a new connector-side session instance.
+     */
+    DatagramSessionImpl(IoService service,
+                        DatagramChannel ch, IoProcessor processor) {
+        this(service, ch, processor, ch.socket().getRemoteSocketAddress());
     }
 
     public IoService getService() {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
 Thu Sep 27 01:32:13 2007
@@ -170,50 +170,28 @@
         }
     }
 
-    /**
-     * @see org.apache.mina.common.IoService#getTransportMetadata()
-     */
     public TransportMetadata getTransportMetadata() {
         return SocketSessionImpl.METADATA;
     }
 
-    /**
-     * @see org.apache.mina.common.AbstractIoService#getSessionConfig()
-     */
     @Override
     public SocketSessionConfig getSessionConfig() {
         return (SocketSessionConfig) super.getSessionConfig();
     }
 
-    /**
-     * @see org.apache.mina.common.AbstractIoAcceptor#getLocalAddress()
-     */
     @Override
     public InetSocketAddress getLocalAddress() {
         return (InetSocketAddress) super.getLocalAddress();
     }
 
-    // This method is added to work around a problem with
-    // bean property access mechanism.
-
-    /**
-     * @see 
org.apache.mina.common.AbstractIoAcceptor#setLocalAddress(java.net.SocketAddress)
-     * @param localAddress the local address
-     */
     public void setLocalAddress(InetSocketAddress localAddress) {
-        super.setLocalAddress(localAddress);
+        setLocalAddress((SocketAddress) localAddress);
     }
 
-    /**
-     * @see ServerSocket#getReuseAddress()
-     */
     public boolean isReuseAddress() {
         return reuseAddress;
     }
 
-    /**
-     * @see ServerSocket#setReuseAddress(boolean)
-     */
     public void setReuseAddress(boolean reuseAddress) {
         synchronized (bindLock) {
             if (isBound()) {
@@ -225,21 +203,10 @@
         }
     }
 
-    /**
-     * Returns the size of the backlog.
-     *
-     * @return
-     */
     public int getBacklog() {
         return backlog;
     }
 
-    /**
-     * Sets the size of the backlog.  This can only be done when this
-     * class is not bound
-     *
-     * @param backlog
-     */
     public void setBacklog(int backlog) {
         synchronized (bindLock) {
             if (isBound()) {

Added: 
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java?rev=579942&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java 
(added)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java 
Thu Sep 27 01:32:13 2007
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A listener for expired object events.
+ *
+ * @author The Apache Directory Project ([EMAIL PROTECTED])
+ * TODO Make this a inner interface of ExpiringMap
+ */
+public interface ExpirationListener<E> {
+    void expired(E expiredObject);
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java?rev=579942&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java Thu Sep 
27 01:32:13 2007
@@ -0,0 +1,386 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A map with expiration.
+ *
+ * @author The Apache Directory Project ([EMAIL PROTECTED])
+ */
+public class ExpiringMap<K, V> implements Map<K, V> {
+    public static final int DEFAULT_TIME_TO_LIVE = 60;
+
+    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
+
+    private static volatile int expirerCount = 1;
+
+    private final ConcurrentHashMap<K, ExpiringObject> delegate;
+
+    private final CopyOnWriteArrayList<ExpirationListener<V>> 
expirationListeners;
+
+    private final Expirer expirer;
+
+    public ExpiringMap() {
+        this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL);
+    }
+
+    public ExpiringMap(int timeToLive) {
+        this(timeToLive, DEFAULT_EXPIRATION_INTERVAL);
+    }
+
+    public ExpiringMap(int timeToLive, int expirationInterval) {
+        this(new ConcurrentHashMap<K, ExpiringObject>(),
+                new CopyOnWriteArrayList<ExpirationListener<V>>(), timeToLive,
+                expirationInterval);
+    }
+
+    private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate,
+            CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners,
+            int timeToLive, int expirationInterval) {
+        this.delegate = delegate;
+        this.expirationListeners = expirationListeners;
+
+        this.expirer = new Expirer();
+        expirer.setTimeToLive(timeToLive);
+        expirer.setExpirationInterval(expirationInterval);
+    }
+
+    public V put(K key, V value) {
+        ExpiringObject answer = delegate.put(key, new ExpiringObject(key,
+                value, System.currentTimeMillis()));
+        if (answer == null) {
+            return null;
+        }
+
+        return answer.getValue();
+    }
+
+    public V get(Object key) {
+        ExpiringObject object = delegate.get(key);
+
+        if (object != null) {
+            object.setLastAccessTime(System.currentTimeMillis());
+
+            return object.getValue();
+        }
+
+        return null;
+    }
+
+    public V remove(Object key) {
+        ExpiringObject answer = delegate.remove(key);
+        if (answer == null) {
+            return null;
+        }
+
+        return answer.getValue();
+    }
+
+    public boolean containsKey(Object key) {
+        return delegate.containsKey(key);
+    }
+
+    public boolean containsValue(Object value) {
+        return delegate.containsValue(value);
+    }
+
+    public int size() {
+        return delegate.size();
+    }
+
+    public boolean isEmpty() {
+        return delegate.isEmpty();
+    }
+
+    public void clear() {
+        delegate.clear();
+    }
+
+    @Override
+    public int hashCode() {
+        return delegate.hashCode();
+    }
+
+    public Set<K> keySet() {
+        return delegate.keySet();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return delegate.equals(obj);
+    }
+
+    public void putAll(Map<? extends K, ? extends V> inMap) {
+        for (Entry<? extends K, ? extends V> e : inMap.entrySet()) {
+            this.put(e.getKey(), e.getValue());
+        }
+    }
+
+    public Collection<V> values() {
+        throw new UnsupportedOperationException();
+    }
+
+    public Set<Map.Entry<K, V>> entrySet() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void addExpirationListener(ExpirationListener<V> listener) {
+        expirationListeners.add(listener);
+    }
+
+    public void removeExpirationListener(
+            ExpirationListener<V> listener) {
+        expirationListeners.remove(listener);
+    }
+
+    public Expirer getExpirer() {
+        return expirer;
+    }
+
+    public int getExpirationInterval() {
+        return expirer.getExpirationInterval();
+    }
+
+    public int getTimeToLive() {
+        return expirer.getTimeToLive();
+    }
+
+    public void setExpirationInterval(int expirationInterval) {
+        expirer.setExpirationInterval(expirationInterval);
+    }
+
+    public void setTimeToLive(int timeToLive) {
+        expirer.setTimeToLive(timeToLive);
+    }
+
+    private class ExpiringObject {
+        private K key;
+
+        private V value;
+
+        private long lastAccessTime;
+
+        private final ReadWriteLock lastAccessTimeLock = new 
ReentrantReadWriteLock();
+
+        ExpiringObject(K key, V value, long lastAccessTime) {
+            if (value == null) {
+                throw new IllegalArgumentException(
+                        "An expiring object cannot be null.");
+            }
+
+            this.key = key;
+            this.value = value;
+            this.lastAccessTime = lastAccessTime;
+        }
+
+        public long getLastAccessTime() {
+            lastAccessTimeLock.readLock().lock();
+
+            try {
+                return lastAccessTime;
+            } finally {
+                lastAccessTimeLock.readLock().unlock();
+            }
+        }
+
+        public void setLastAccessTime(long lastAccessTime) {
+            lastAccessTimeLock.writeLock().lock();
+
+            try {
+                this.lastAccessTime = lastAccessTime;
+            } finally {
+                lastAccessTimeLock.writeLock().unlock();
+            }
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return value.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return value.hashCode();
+        }
+    }
+
+    public class Expirer implements Runnable {
+        private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
+        private long timeToLiveMillis;
+
+        private long expirationIntervalMillis;
+
+        private boolean running = false;
+
+        private final Thread expirerThread;
+
+        public Expirer() {
+            expirerThread = new Thread(this, "ExpiringMapExpirer-"
+                    + expirerCount++);
+            expirerThread.setDaemon(true);
+        }
+
+        public void run() {
+            while (running) {
+                processExpires();
+
+                try {
+                    Thread.sleep(expirationIntervalMillis);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        private void processExpires() {
+            long timeNow = System.currentTimeMillis();
+
+            for (ExpiringObject o : delegate.values()) {
+
+                if (timeToLiveMillis <= 0) {
+                    continue;
+                }
+
+                long timeIdle = timeNow - o.getLastAccessTime();
+
+                if (timeIdle >= timeToLiveMillis) {
+                    delegate.remove(o.getKey());
+
+                    for (ExpirationListener<V> listener : expirationListeners) 
{
+                        listener.expired(o.getValue());
+                    }
+                }
+            }
+        }
+
+        public void startExpiring() {
+            stateLock.writeLock().lock();
+
+            try {
+                if (!running) {
+                    running = true;
+                    expirerThread.start();
+                }
+            } finally {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public void startExpiringIfNotStarted() {
+            stateLock.readLock().lock();
+            try {
+                if (running) {
+                    return;
+                }
+            } finally {
+                stateLock.readLock().unlock();
+            }
+
+            stateLock.writeLock().lock();
+            try {
+                if (!running) {
+                    running = true;
+                    expirerThread.start();
+                }
+            } finally {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public void stopExpiring() {
+            stateLock.writeLock().lock();
+
+            try {
+                if (running) {
+                    running = false;
+                    expirerThread.interrupt();
+                }
+            } finally {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public boolean isRunning() {
+            stateLock.readLock().lock();
+
+            try {
+                return running;
+            } finally {
+                stateLock.readLock().unlock();
+            }
+        }
+
+        public int getTimeToLive() {
+            stateLock.readLock().lock();
+
+            try {
+                return (int) timeToLiveMillis / 1000;
+            } finally {
+                stateLock.readLock().unlock();
+            }
+        }
+
+        public void setTimeToLive(long timeToLive) {
+            stateLock.writeLock().lock();
+
+            try {
+                this.timeToLiveMillis = timeToLive * 1000;
+            } finally {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public int getExpirationInterval() {
+            stateLock.readLock().lock();
+
+            try {
+                return (int) expirationIntervalMillis / 1000;
+            } finally {
+                stateLock.readLock().unlock();
+            }
+        }
+
+        public void setExpirationInterval(long expirationInterval) {
+            stateLock.writeLock().lock();
+
+            try {
+                this.expirationIntervalMillis = expirationInterval * 1000;
+            } finally {
+                stateLock.writeLock().unlock();
+            }
+        }
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
--- 
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
 (original)
+++ 
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
 Thu Sep 27 01:32:13 2007
@@ -92,6 +92,8 @@
             assertEquals("1", getSent(session));
 
             session.suspendRead();
+            
+            Thread.sleep(100);
 
             write(session, "2");
             assertFalse(canRead(session));
@@ -100,6 +102,8 @@
 
             session.suspendWrite();
 
+            Thread.sleep(100);
+
             write(session, "3");
             assertFalse(canRead(session));
             assertEquals("1", getReceived(session));
@@ -107,6 +111,8 @@
 
             session.resumeRead();
 
+            Thread.sleep(100);
+
             write(session, "4");
             assertEquals('2', read(session));
             assertEquals("12", getReceived(session));
@@ -114,6 +120,8 @@
 
             session.resumeWrite();
 
+            Thread.sleep(100);
+
             assertEquals('3', read(session));
             assertEquals('4', read(session));
 
@@ -124,6 +132,8 @@
 
             session.suspendWrite();
 
+            Thread.sleep(100);
+
             write(session, "6");
             assertFalse(canRead(session));
             assertEquals("12345", getReceived(session));
@@ -132,12 +142,17 @@
             session.suspendRead();
             session.resumeWrite();
 
+            Thread.sleep(100);
+
             write(session, "7");
             assertFalse(canRead(session));
             assertEquals("12345", getReceived(session));
             assertEquals("1234567", getSent(session));
 
             session.resumeRead();
+
+            Thread.sleep(100);
+
             assertEquals('6', read(session));
             assertEquals('7', read(session));
 

Added: 
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?rev=579942&view=auto
==============================================================================
--- 
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
 (added)
+++ 
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
 Thu Sep 27 01:32:13 2007
@@ -0,0 +1,149 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExpiringSessionRecycler;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.AvailablePortFinder;
+
+/**
+ * Tests if datagram sessions are recycled properly.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class DatagramRecyclerTest extends TestCase {
+    private final DatagramAcceptor acceptor = new DatagramAcceptor();
+
+    private final DatagramConnector connector = new DatagramConnector();
+
+    public DatagramRecyclerTest() {
+    }
+
+    public void testDatagramRecycler() throws Exception {
+        int port = AvailablePortFinder.getNextAvailable(1024);
+        ExpiringSessionRecycler recycler = new ExpiringSessionRecycler(1, 1);
+
+        MockHandler acceptorHandler = new MockHandler();
+        MockHandler connectorHandler = new MockHandler();
+
+        acceptor.setLocalAddress(new InetSocketAddress(port));
+        acceptor.setHandler(acceptorHandler);
+        acceptor.setSessionRecycler(recycler);
+        acceptor.bind();
+
+        try {
+            connector.setHandler(connectorHandler);
+            ConnectFuture future = connector.connect(new InetSocketAddress(
+                    "localhost", port));
+            future.awaitUninterruptibly();
+
+            // Write whatever to trigger the acceptor.
+            future.getSession().write(ByteBuffer.allocate(1))
+                    .awaitUninterruptibly();
+
+            // Close the client-side connection.
+            // This doesn't mean that the acceptor-side connection is also 
closed.
+            // The life cycle of the acceptor-side connection is managed by 
the recycler.
+            future.getSession().close();
+            future.getSession().getCloseFuture().awaitUninterruptibly();
+            Assert.assertTrue(future.getSession().getCloseFuture().isClosed());
+
+            // Wait until the acceptor-side connection is closed.
+            while (acceptorHandler.session == null) {
+                Thread.yield();
+            }
+            
acceptorHandler.session.getCloseFuture().awaitUninterruptibly(3000);
+
+            // Is it closed?
+            Assert.assertTrue(acceptorHandler.session.getCloseFuture()
+                    .isClosed());
+
+            Thread.sleep(1000);
+
+            Assert.assertEquals("CROPSECL", connectorHandler.result);
+            Assert.assertEquals("CROPRECL", acceptorHandler.result);
+        } finally {
+            acceptor.unbind();
+        }
+    }
+
+    private class MockHandler extends IoHandlerAdapter {
+        public IoSession session;
+
+        public String result = "";
+
+        @Override
+        public void exceptionCaught(IoSession session, Throwable cause)
+                throws Exception {
+            this.session = session;
+            result += "CA";
+        }
+
+        @Override
+        public void messageReceived(IoSession session, Object message)
+                throws Exception {
+            this.session = session;
+            result += "RE";
+        }
+
+        @Override
+        public void messageSent(IoSession session, Object message)
+                throws Exception {
+            this.session = session;
+            result += "SE";
+        }
+
+        @Override
+        public void sessionClosed(IoSession session) throws Exception {
+            this.session = session;
+            result += "CL";
+        }
+
+        @Override
+        public void sessionCreated(IoSession session) throws Exception {
+            this.session = session;
+            result += "CR";
+        }
+
+        @Override
+        public void sessionIdle(IoSession session, IdleStatus status)
+                throws Exception {
+            this.session = session;
+            result += "ID";
+        }
+
+        @Override
+        public void sessionOpened(IoSession session) throws Exception {
+            this.session = session;
+            result += "OP";
+        }
+
+    }
+}

Propchange: 
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to