Author: trustin
Date: Thu Sep 27 01:32:13 2007
New Revision: 579942
URL: http://svn.apache.org/viewvc?rev=579942&view=rev
Log:
* Reverted DatagramAcceptor because my experimental implementation behaves
unpredictably on some platforms.
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java (with
props)
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
(with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java?rev=579942&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
Thu Sep 27 01:32:13 2007
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.util.ExpirationListener;
+import org.apache.mina.util.ExpiringMap;
+
+/**
+ * An [EMAIL PROTECTED] IoSessionRecycler} with sessions that time out on
inactivity.
+ *
+ * TODO Document me.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class ExpiringSessionRecycler implements IoSessionRecycler {
+ private ExpiringMap<Object, IoSession> sessionMap;
+
+ private ExpiringMap<Object, IoSession>.Expirer mapExpirer;
+
+ public ExpiringSessionRecycler() {
+ this(ExpiringMap.DEFAULT_TIME_TO_LIVE);
+ }
+
+ public ExpiringSessionRecycler(int timeToLive) {
+ this(timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL);
+ }
+
+ public ExpiringSessionRecycler(int timeToLive, int expirationInterval) {
+ sessionMap = new ExpiringMap<Object, IoSession>(timeToLive,
+ expirationInterval);
+ mapExpirer = sessionMap.getExpirer();
+ sessionMap.addExpirationListener(new DefaultExpirationListener());
+ }
+
+ public void put(IoSession session) {
+ mapExpirer.startExpiringIfNotStarted();
+
+ Object key = generateKey(session);
+
+ if (!sessionMap.containsKey(key)) {
+ sessionMap.put(key, session);
+ }
+ }
+
+ public IoSession recycle(SocketAddress localAddress,
+ SocketAddress remoteAddress) {
+ return sessionMap.get(generateKey(localAddress, remoteAddress));
+ }
+
+ public void remove(IoSession session) {
+ sessionMap.remove(generateKey(session));
+ }
+
+ public void stopExpiring() {
+ mapExpirer.stopExpiring();
+ }
+
+ public int getExpirationInterval() {
+ return sessionMap.getExpirationInterval();
+ }
+
+ public int getTimeToLive() {
+ return sessionMap.getTimeToLive();
+ }
+
+ public void setExpirationInterval(int expirationInterval) {
+ sessionMap.setExpirationInterval(expirationInterval);
+ }
+
+ public void setTimeToLive(int timeToLive) {
+ sessionMap.setTimeToLive(timeToLive);
+ }
+
+ private Object generateKey(IoSession session) {
+ return generateKey(session.getLocalAddress(), session
+ .getRemoteAddress());
+ }
+
+ private Object generateKey(SocketAddress localAddress,
+ SocketAddress remoteAddress) {
+ List<SocketAddress> key = new ArrayList<SocketAddress>(2);
+ key.add(remoteAddress);
+ key.add(localAddress);
+ return key;
+ }
+
+ private class DefaultExpirationListener implements
+ ExpirationListener<IoSession> {
+ public void expired(IoSession expiredSession) {
+ expiredSession.close();
+ }
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java?rev=579942&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
(added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
Thu Sep 27 01:32:13 2007
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+
+/**
+ * A connectionless transport can recycle existing sessions by assigning an
+ * [EMAIL PROTECTED] IoSessionRecycler} to an [EMAIL PROTECTED] IoService}.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * TODO More documentation
+ */
+public interface IoSessionRecycler {
+ /**
+ * A dummy recycler that doesn't recycle any sessions. Using this
recycler will
+ * make all session lifecycle events to be fired for every I/O for all
connectionless
+ * sessions.
+ */
+ static IoSessionRecycler NOOP = new IoSessionRecycler() {
+ public void put(IoSession session) {
+ }
+
+ public IoSession recycle(SocketAddress localAddress,
+ SocketAddress remoteAddress) {
+ return null;
+ }
+
+ public void remove(IoSession session) {
+ }
+ };
+
+ /**
+ * Called when the underlying transport creates or writes a new [EMAIL
PROTECTED] IoSession}.
+ *
+ * @param session
+ * the new [EMAIL PROTECTED] IoSession}.
+ */
+ void put(IoSession session);
+
+ /**
+ * Attempts to retrieve a recycled [EMAIL PROTECTED] IoSession}.
+ *
+ * @param localAddress
+ * the local socket address of the [EMAIL PROTECTED] IoSession}
the
+ * transport wants to recycle.
+ * @param remoteAddress
+ * the remote socket address of the [EMAIL PROTECTED]
IoSession} the
+ * transport wants to recycle.
+ * @return a recycled [EMAIL PROTECTED] IoSession}, or null if one cannot
be found.
+ */
+ IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
+
+ /**
+ * Called when an [EMAIL PROTECTED] IoSession} is explicitly closed.
+ *
+ * @param session
+ * the new [EMAIL PROTECTED] IoSession}.
+ */
+ void remove(IoSession session);
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
Thu Sep 27 01:32:13 2007
@@ -28,22 +28,21 @@
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoAcceptor;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.ExpiringSessionRecycler;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.RuntimeIoException;
import org.apache.mina.common.TransportMetadata;
+import org.apache.mina.common.WriteRequest;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
import org.apache.mina.util.NamePreservingRunnable;
@@ -57,27 +56,27 @@
*/
public class DatagramAcceptor extends AbstractIoAcceptor implements
org.apache.mina.transport.socket.DatagramAcceptor {
+ private static final IoSessionRecycler DEFAULT_RECYCLER = new
ExpiringSessionRecycler();
private static volatile int nextId = 0;
+ private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
+
private final Executor executor;
private final int id = nextId++;
private final Selector selector;
-
- private final DatagramConnector connector;
+
+ private final IoProcessor processor = new DatagramAcceptorProcessor();
private DatagramChannel channel;
- private final Queue<ServiceOperationFuture> registerQueue =
- new ConcurrentLinkedQueue<ServiceOperationFuture>();
+ private final Queue<ServiceOperationFuture> registerQueue = new
ConcurrentLinkedQueue<ServiceOperationFuture>();
- private final Queue<ServiceOperationFuture> cancelQueue =
- new ConcurrentLinkedQueue<ServiceOperationFuture>();
-
- private final ConcurrentMap<SocketAddress, Object> cache =
- new ConcurrentHashMap<SocketAddress, Object>();
+ private final Queue<ServiceOperationFuture> cancelQueue = new
ConcurrentLinkedQueue<ServiceOperationFuture>();
+
+ private final Queue<DatagramSessionImpl> flushingSessions = new
ConcurrentLinkedQueue<DatagramSessionImpl>();
private Worker worker;
@@ -92,13 +91,6 @@
* Creates a new instance.
*/
public DatagramAcceptor(Executor executor) {
- this(Runtime.getRuntime().availableProcessors() + 1, executor);
- }
-
- /**
- * Creates a new instance.
- */
- public DatagramAcceptor(int processorCount, Executor executor) {
super(new DefaultDatagramSessionConfig());
try {
@@ -108,11 +100,6 @@
}
this.executor = executor;
- this.connector = new DatagramConnector(
- this, "DatagramAcceptor-" + id, processorCount, executor);
-
- // The default reuseAddress should be 'true' for an accepted socket.
- getSessionConfig().setReuseAddress(true);
}
@Override
@@ -138,51 +125,40 @@
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) super.getLocalAddress();
}
-
- // This method is added to work around a problem with
- // bean property access mechanism.
-
- /**
- * @see
org.apache.mina.common.AbstractIoAcceptor#setLocalAddress(java.net.SocketAddress)
- * @param localAddress the local address
- */
- public void setLocalAddress(InetSocketAddress localAddress) {
- super.setLocalAddress(localAddress);
- }
- @Override
- protected IoServiceListenerSupport getListeners() {
- return super.getListeners();
+ public void setLocalAddress(InetSocketAddress localAddress) {
+ setLocalAddress((SocketAddress) localAddress);
}
@Override
protected void doBind() throws Exception {
- ServiceOperationFuture future = new ServiceOperationFuture();
+ ServiceOperationFuture request = new ServiceOperationFuture();
- registerQueue.add(future);
+ registerQueue.add(request);
startupWorker();
selector.wakeup();
- future.awaitUninterruptibly();
+ request.awaitUninterruptibly();
- if (future.getException() != null) {
- throw future.getException();
+ if (request.getException() != null) {
+ throw request.getException();
}
-
+
setLocalAddress(channel.socket().getLocalSocketAddress());
}
@Override
protected void doUnbind() throws Exception {
- ServiceOperationFuture future = new ServiceOperationFuture();
+ ServiceOperationFuture request = new ServiceOperationFuture();
- cancelQueue.add(future);
+ cancelQueue.add(request);
startupWorker();
selector.wakeup();
- future.awaitUninterruptibly();
- if (future.getException() != null) {
- throw future.getException();
+ request.awaitUninterruptibly();
+
+ if (request.getException() != null) {
+ throw request.getException();
}
}
@@ -196,28 +172,98 @@
throw new IllegalStateException(
"Can't create a session from a unbound service.");
}
-
- Object data;
- synchronized (cache) {
- data = cache.get(remoteAddress);
- if (data == null) {
- ConnectFuture future = connector.connect(remoteAddress,
getLocalAddress());
- cache.put(remoteAddress, future);
- future.awaitUninterruptibly();
- return future.getSession();
- }
- }
-
- if (data instanceof ConnectFuture) {
- ConnectFuture future = (ConnectFuture) data;
- future.awaitUninterruptibly();
- return future.getSession();
- } else if (data instanceof IoSession) {
- return ((IoSession) data);
- } else {
- throw new IllegalStateException();
+
+ return newSessionWithoutLock(remoteAddress);
+ }
+ }
+
+ private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
+ Selector selector = this.selector;
+ DatagramChannel ch = this.channel;
+ SelectionKey key = ch.keyFor(selector);
+
+ IoSession session;
+ IoSessionRecycler sessionRecycler = getSessionRecycler();
+ synchronized (sessionRecycler) {
+ session = sessionRecycler.recycle(getLocalAddress(),
remoteAddress);
+ if (session != null) {
+ return session;
+ }
+
+ // If a new session needs to be created.
+ DatagramSessionImpl datagramSession = new DatagramSessionImpl(
+ this, ch, processor, remoteAddress);
+ datagramSession.setSelectionKey(key);
+
+ getSessionRecycler().put(datagramSession);
+ session = datagramSession;
+ }
+
+ try {
+
this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ getListeners().fireSessionCreated(session);
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ }
+
+ return session;
+ }
+
+ /**
+ * Returns the [EMAIL PROTECTED] IoSessionRecycler} for this service.
+ */
+ public IoSessionRecycler getSessionRecycler() {
+ return sessionRecycler;
+ }
+
+ /**
+ * Sets the [EMAIL PROTECTED] IoSessionRecycler} for this service.
+ *
+ * @param sessionRecycler <tt>null</tt> to use the default recycler
+ */
+ public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
+ synchronized (bindLock) {
+ if (isBound()) {
+ throw new IllegalStateException(
+ "sessionRecycler can't be set while the acceptor is
bound.");
+ }
+
+ if (sessionRecycler == null) {
+ sessionRecycler = DEFAULT_RECYCLER;
+ }
+ this.sessionRecycler = sessionRecycler;
+ }
+ }
+
+ @Override
+ protected IoServiceListenerSupport getListeners() {
+ return super.getListeners();
+ }
+
+ IoProcessor getProcessor() {
+ return processor;
+ }
+
+ private class DatagramAcceptorProcessor implements IoProcessor {
+
+ public void add(IoSession session) {
+ }
+
+ public void flush(IoSession session) {
+ if (scheduleFlush((DatagramSessionImpl) session)) {
+ Selector selector = DatagramAcceptor.this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
+
+ public void remove(IoSession session) {
+ getListeners().fireSessionDestroyed(session);
+ }
+
+ public void updateTrafficMask(IoSession session) {
+ }
}
private synchronized void startupWorker() {
@@ -227,6 +273,15 @@
}
}
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
private class Worker implements Runnable {
public void run() {
Thread.currentThread().setName("DatagramAcceptor-" + id);
@@ -241,6 +296,7 @@
processReadySessions(selector.selectedKeys());
}
+ flushSessions();
cancelKeys();
if (selector.keys().isEmpty()) {
@@ -277,6 +333,12 @@
if (key.isReadable()) {
readSession(ch);
}
+
+ if (key.isWritable()) {
+ for (IoSession session : getManagedSessions()) {
+ scheduleFlush((DatagramSessionImpl) session);
+ }
+ }
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
}
@@ -284,55 +346,104 @@
}
private void readSession(DatagramChannel channel) throws Exception {
- final ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
- .getReadBufferSize());
+ ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
+ .getReceiveBufferSize());
- final SocketAddress remoteAddress = channel.receive(readBuf.buf());
+ SocketAddress remoteAddress = channel.receive(readBuf.buf());
if (remoteAddress != null) {
+ DatagramSessionImpl session = (DatagramSessionImpl)
newSessionWithoutLock(remoteAddress);
+
readBuf.flip();
- Object data;
- ConnectFuture future = null;
- synchronized (cache) {
- data = cache.get(remoteAddress);
- if (data == null) {
- future = connector.connect(remoteAddress,
getLocalAddress());
- cache.put(remoteAddress, future);
- }
- }
-
- if (data == null) {
- future.addListener(new IoFutureListener() {
- public void operationComplete(IoFuture future) {
- ConnectFuture f = (ConnectFuture) future;
- if (f.getException() == null) {
- IoSession s = f.getSession();
- cache.put(remoteAddress, s);
- s.getCloseFuture().addListener(new
IoFutureListener() {
- public void operationComplete(IoFuture future)
{
- cache.remove(remoteAddress);
- }
- });
- s.getFilterChain().fireMessageReceived(readBuf);
- } else {
-
ExceptionMonitor.getInstance().exceptionCaught(f.getException());
- }
- }
- });
- } else if (data instanceof ConnectFuture) {
- future = (ConnectFuture) data;
- future.addListener(new IoFutureListener() {
- public void operationComplete(IoFuture future) {
- ConnectFuture f = (ConnectFuture) future;
- if (f.getException() == null) {
- IoSession s = f.getSession();
- s.getFilterChain().fireMessageReceived(readBuf);
- }
- }
- });
- } else if (data instanceof IoSession) {
- ((IoSession)
data).getFilterChain().fireMessageReceived(readBuf);
+
+ ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
+ newBuf.put(readBuf);
+ newBuf.flip();
+
+ session.increaseReadBytes(newBuf.remaining());
+ session.getFilterChain().fireMessageReceived(newBuf);
+ }
+ }
+
+ private void flushSessions() {
+ for (; ;) {
+ DatagramSessionImpl session = flushingSessions.poll();
+ if (session == null) {
+ break;
+ }
+
+ session.setScheduledForFlush(false);
+
+ try {
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() &&
!session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
+ } catch (IOException e) {
+ session.getFilterChain().fireExceptionCaught(e);
+ }
+ }
+ }
+
+ private boolean flush(DatagramSessionImpl session) throws IOException {
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return false;
+ }
+ if (!key.isValid()) {
+ return false;
+ }
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+ DatagramChannel ch = session.getChannel();
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+ int writtenBytes = 0;
+ int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+ try {
+ for (; ;) {
+ WriteRequest req = writeRequestQueue.peek();
+ if (req == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ writeRequestQueue.poll();
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int localWrittenBytes = ch.send(buf.buf(), destination);
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes)
{
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ return false;
+ } else {
+ key.interestOps(key.interestOps() &
~SelectionKey.OP_WRITE);
+
+ // pop and fire event
+ writeRequestQueue.poll();
+ writtenBytes += localWrittenBytes;
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ }
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
@@ -341,8 +452,8 @@
}
for (; ;) {
- ServiceOperationFuture future = registerQueue.poll();
- if (future == null) {
+ ServiceOperationFuture req = registerQueue.poll();
+ if (req == null) {
break;
}
@@ -361,14 +472,15 @@
ch.configureBlocking(false);
ch.socket().bind(getLocalAddress());
- ch.register(selector, SelectionKey.OP_READ, future);
+ ch.register(selector, SelectionKey.OP_READ, req);
this.channel = ch;
- future.setDone();
+ getListeners().fireServiceActivated();
+ req.setDone();
} catch (Exception e) {
- future.setException(e);
+ req.setException(e);
} finally {
- if (ch != null && future.getException() != null) {
+ if (ch != null && req.getException() != null) {
try {
ch.disconnect();
ch.close();
@@ -382,8 +494,8 @@
private void cancelKeys() {
for (; ;) {
- ServiceOperationFuture future = cancelQueue.poll();
- if (future == null) {
+ ServiceOperationFuture request = cancelQueue.poll();
+ if (request == null) {
break;
}
@@ -400,8 +512,8 @@
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
} finally {
- future.setDone();
getListeners().fireServiceDeactivated();
+ request.setDone();
}
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
Thu Sep 27 01:32:13 2007
@@ -28,13 +28,8 @@
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
import org.apache.mina.common.DefaultIoFilterChain;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -53,7 +48,7 @@
private static volatile int nextId = 0;
private final int id = nextId++;
- private final IoService parent;
+ private final String threadName = "DatagramConnector-" + id;
private final int processorCount;
private final NioProcessor[] ioProcessors;
@@ -77,22 +72,8 @@
* Creates a new instance.
*/
public DatagramConnector(int processorCount, Executor executor) {
- this(null, null, processorCount, executor);
- }
-
- DatagramConnector(
- IoService parent, String threadNamePrefix, int processorCount,
Executor executor) {
super(new DefaultDatagramSessionConfig());
- // DotagramAcceptor can use DatagramConnector as a child.
- if (parent == null) {
- parent = this;
- }
- if (threadNamePrefix == null) {
- threadNamePrefix = "DatagramConnector-" + id;
- }
- this.parent = parent;
-
if (processorCount < 1) {
throw new IllegalArgumentException(
"Must have at least one processor");
@@ -105,7 +86,7 @@
// handling sessions.
for (int i = 0; i < processorCount; i++) {
ioProcessors[i] = new NioProcessor(
- threadNamePrefix + '.' + i, executor);
+ threadName + '.' + i, executor);
}
}
@@ -122,66 +103,8 @@
}
@Override
- protected IoServiceListenerSupport getListeners() {
- if (parent == this) {
- return super.getListeners();
- } else {
- return ((DatagramAcceptor) parent).getListeners();
- }
- }
-
- @Override
public DatagramSessionConfig getSessionConfig() {
- if (parent == this) {
- return (DatagramSessionConfig) super.getSessionConfig();
- } else {
- return (DatagramSessionConfig) parent.getSessionConfig();
- }
- }
-
- @Override
- public DefaultIoFilterChainBuilder getFilterChain() {
- if (parent == this) {
- return super.getFilterChain();
- } else {
- return parent.getFilterChain();
- }
- }
-
- @Override
- public IoFilterChainBuilder getFilterChainBuilder() {
- if (parent == this) {
- return super.getFilterChainBuilder();
- } else {
- return parent.getFilterChainBuilder();
- }
- }
-
- @Override
- public void setFilterChainBuilder(IoFilterChainBuilder builder) {
- if (parent == this) {
- super.setFilterChainBuilder(builder);
- } else {
- parent.setFilterChainBuilder(builder);
- }
- }
-
- @Override
- public IoHandler getHandler() {
- if (parent == this) {
- return super.getHandler();
- } else {
- return parent.getHandler();
- }
- }
-
- @Override
- public void setHandler(IoHandler handler) {
- if (parent == this) {
- super.setHandler(handler);
- } else {
- parent.setHandler(handler);
- }
+ return (DatagramSessionConfig) super.getSessionConfig();
}
@Override
@@ -202,7 +125,7 @@
ch.connect(remoteAddress);
NioProcessor processor = nextProcessor();
- session = new DatagramSessionImpl(parent, ch, processor);
+ session = new DatagramSessionImpl(this, ch, processor);
ConnectFuture future = new DefaultConnectFuture();
// DefaultIoFilterChain will notify the connect future.
session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
Thu Sep 27 01:32:13 2007
@@ -74,20 +74,28 @@
private SelectionKey key;
/**
- * Creates a new connector instance.
+ * Creates a new acceptor-side session instance.
*/
DatagramSessionImpl(IoService service,
- DatagramChannel ch, IoProcessor processor) {
+ DatagramChannel ch, IoProcessor processor,
+ SocketAddress remoteAddress) {
this.service = service;
this.ch = ch;
this.handler = service.getHandler();
this.processor = processor;
- this.remoteAddress = (InetSocketAddress) ch.socket()
- .getRemoteSocketAddress();
+ this.remoteAddress = (InetSocketAddress) remoteAddress;
this.localAddress = (InetSocketAddress) ch.socket()
.getLocalSocketAddress();
this.config.setAll(service.getSessionConfig());
+ }
+
+ /**
+ * Creates a new connector-side session instance.
+ */
+ DatagramSessionImpl(IoService service,
+ DatagramChannel ch, IoProcessor processor) {
+ this(service, ch, processor, ch.socket().getRemoteSocketAddress());
}
public IoService getService() {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
Thu Sep 27 01:32:13 2007
@@ -170,50 +170,28 @@
}
}
- /**
- * @see org.apache.mina.common.IoService#getTransportMetadata()
- */
public TransportMetadata getTransportMetadata() {
return SocketSessionImpl.METADATA;
}
- /**
- * @see org.apache.mina.common.AbstractIoService#getSessionConfig()
- */
@Override
public SocketSessionConfig getSessionConfig() {
return (SocketSessionConfig) super.getSessionConfig();
}
- /**
- * @see org.apache.mina.common.AbstractIoAcceptor#getLocalAddress()
- */
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) super.getLocalAddress();
}
- // This method is added to work around a problem with
- // bean property access mechanism.
-
- /**
- * @see
org.apache.mina.common.AbstractIoAcceptor#setLocalAddress(java.net.SocketAddress)
- * @param localAddress the local address
- */
public void setLocalAddress(InetSocketAddress localAddress) {
- super.setLocalAddress(localAddress);
+ setLocalAddress((SocketAddress) localAddress);
}
- /**
- * @see ServerSocket#getReuseAddress()
- */
public boolean isReuseAddress() {
return reuseAddress;
}
- /**
- * @see ServerSocket#setReuseAddress(boolean)
- */
public void setReuseAddress(boolean reuseAddress) {
synchronized (bindLock) {
if (isBound()) {
@@ -225,21 +203,10 @@
}
}
- /**
- * Returns the size of the backlog.
- *
- * @return
- */
public int getBacklog() {
return backlog;
}
- /**
- * Sets the size of the backlog. This can only be done when this
- * class is not bound
- *
- * @param backlog
- */
public void setBacklog(int backlog) {
synchronized (bindLock) {
if (isBound()) {
Added:
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java?rev=579942&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
(added)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
Thu Sep 27 01:32:13 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A listener for expired object events.
+ *
+ * @author The Apache Directory Project ([EMAIL PROTECTED])
+ * TODO Make this a inner interface of ExpiringMap
+ */
+public interface ExpirationListener<E> {
+ void expired(E expiredObject);
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java?rev=579942&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java Thu Sep
27 01:32:13 2007
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A map with expiration.
+ *
+ * @author The Apache Directory Project ([EMAIL PROTECTED])
+ */
+public class ExpiringMap<K, V> implements Map<K, V> {
+ public static final int DEFAULT_TIME_TO_LIVE = 60;
+
+ public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
+
+ private static volatile int expirerCount = 1;
+
+ private final ConcurrentHashMap<K, ExpiringObject> delegate;
+
+ private final CopyOnWriteArrayList<ExpirationListener<V>>
expirationListeners;
+
+ private final Expirer expirer;
+
+ public ExpiringMap() {
+ this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL);
+ }
+
+ public ExpiringMap(int timeToLive) {
+ this(timeToLive, DEFAULT_EXPIRATION_INTERVAL);
+ }
+
+ public ExpiringMap(int timeToLive, int expirationInterval) {
+ this(new ConcurrentHashMap<K, ExpiringObject>(),
+ new CopyOnWriteArrayList<ExpirationListener<V>>(), timeToLive,
+ expirationInterval);
+ }
+
+ private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate,
+ CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners,
+ int timeToLive, int expirationInterval) {
+ this.delegate = delegate;
+ this.expirationListeners = expirationListeners;
+
+ this.expirer = new Expirer();
+ expirer.setTimeToLive(timeToLive);
+ expirer.setExpirationInterval(expirationInterval);
+ }
+
+ public V put(K key, V value) {
+ ExpiringObject answer = delegate.put(key, new ExpiringObject(key,
+ value, System.currentTimeMillis()));
+ if (answer == null) {
+ return null;
+ }
+
+ return answer.getValue();
+ }
+
+ public V get(Object key) {
+ ExpiringObject object = delegate.get(key);
+
+ if (object != null) {
+ object.setLastAccessTime(System.currentTimeMillis());
+
+ return object.getValue();
+ }
+
+ return null;
+ }
+
+ public V remove(Object key) {
+ ExpiringObject answer = delegate.remove(key);
+ if (answer == null) {
+ return null;
+ }
+
+ return answer.getValue();
+ }
+
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key);
+ }
+
+ public boolean containsValue(Object value) {
+ return delegate.containsValue(value);
+ }
+
+ public int size() {
+ return delegate.size();
+ }
+
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ public void clear() {
+ delegate.clear();
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ public Set<K> keySet() {
+ return delegate.keySet();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return delegate.equals(obj);
+ }
+
+ public void putAll(Map<? extends K, ? extends V> inMap) {
+ for (Entry<? extends K, ? extends V> e : inMap.entrySet()) {
+ this.put(e.getKey(), e.getValue());
+ }
+ }
+
+ public Collection<V> values() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Set<Map.Entry<K, V>> entrySet() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addExpirationListener(ExpirationListener<V> listener) {
+ expirationListeners.add(listener);
+ }
+
+ public void removeExpirationListener(
+ ExpirationListener<V> listener) {
+ expirationListeners.remove(listener);
+ }
+
+ public Expirer getExpirer() {
+ return expirer;
+ }
+
+ public int getExpirationInterval() {
+ return expirer.getExpirationInterval();
+ }
+
+ public int getTimeToLive() {
+ return expirer.getTimeToLive();
+ }
+
+ public void setExpirationInterval(int expirationInterval) {
+ expirer.setExpirationInterval(expirationInterval);
+ }
+
+ public void setTimeToLive(int timeToLive) {
+ expirer.setTimeToLive(timeToLive);
+ }
+
+ private class ExpiringObject {
+ private K key;
+
+ private V value;
+
+ private long lastAccessTime;
+
+ private final ReadWriteLock lastAccessTimeLock = new
ReentrantReadWriteLock();
+
+ ExpiringObject(K key, V value, long lastAccessTime) {
+ if (value == null) {
+ throw new IllegalArgumentException(
+ "An expiring object cannot be null.");
+ }
+
+ this.key = key;
+ this.value = value;
+ this.lastAccessTime = lastAccessTime;
+ }
+
+ public long getLastAccessTime() {
+ lastAccessTimeLock.readLock().lock();
+
+ try {
+ return lastAccessTime;
+ } finally {
+ lastAccessTimeLock.readLock().unlock();
+ }
+ }
+
+ public void setLastAccessTime(long lastAccessTime) {
+ lastAccessTimeLock.writeLock().lock();
+
+ try {
+ this.lastAccessTime = lastAccessTime;
+ } finally {
+ lastAccessTimeLock.writeLock().unlock();
+ }
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return value.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+ }
+
+ public class Expirer implements Runnable {
+ private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
+ private long timeToLiveMillis;
+
+ private long expirationIntervalMillis;
+
+ private boolean running = false;
+
+ private final Thread expirerThread;
+
+ public Expirer() {
+ expirerThread = new Thread(this, "ExpiringMapExpirer-"
+ + expirerCount++);
+ expirerThread.setDaemon(true);
+ }
+
+ public void run() {
+ while (running) {
+ processExpires();
+
+ try {
+ Thread.sleep(expirationIntervalMillis);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void processExpires() {
+ long timeNow = System.currentTimeMillis();
+
+ for (ExpiringObject o : delegate.values()) {
+
+ if (timeToLiveMillis <= 0) {
+ continue;
+ }
+
+ long timeIdle = timeNow - o.getLastAccessTime();
+
+ if (timeIdle >= timeToLiveMillis) {
+ delegate.remove(o.getKey());
+
+ for (ExpirationListener<V> listener : expirationListeners)
{
+ listener.expired(o.getValue());
+ }
+ }
+ }
+ }
+
+ public void startExpiring() {
+ stateLock.writeLock().lock();
+
+ try {
+ if (!running) {
+ running = true;
+ expirerThread.start();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public void startExpiringIfNotStarted() {
+ stateLock.readLock().lock();
+ try {
+ if (running) {
+ return;
+ }
+ } finally {
+ stateLock.readLock().unlock();
+ }
+
+ stateLock.writeLock().lock();
+ try {
+ if (!running) {
+ running = true;
+ expirerThread.start();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public void stopExpiring() {
+ stateLock.writeLock().lock();
+
+ try {
+ if (running) {
+ running = false;
+ expirerThread.interrupt();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public boolean isRunning() {
+ stateLock.readLock().lock();
+
+ try {
+ return running;
+ } finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ public int getTimeToLive() {
+ stateLock.readLock().lock();
+
+ try {
+ return (int) timeToLiveMillis / 1000;
+ } finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ public void setTimeToLive(long timeToLive) {
+ stateLock.writeLock().lock();
+
+ try {
+ this.timeToLiveMillis = timeToLive * 1000;
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public int getExpirationInterval() {
+ stateLock.readLock().lock();
+
+ try {
+ return (int) expirationIntervalMillis / 1000;
+ } finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ public void setExpirationInterval(long expirationInterval) {
+ stateLock.writeLock().lock();
+
+ try {
+ this.expirationIntervalMillis = expirationInterval * 1000;
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java?rev=579942&r1=579941&r2=579942&view=diff
==============================================================================
---
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
(original)
+++
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
Thu Sep 27 01:32:13 2007
@@ -92,6 +92,8 @@
assertEquals("1", getSent(session));
session.suspendRead();
+
+ Thread.sleep(100);
write(session, "2");
assertFalse(canRead(session));
@@ -100,6 +102,8 @@
session.suspendWrite();
+ Thread.sleep(100);
+
write(session, "3");
assertFalse(canRead(session));
assertEquals("1", getReceived(session));
@@ -107,6 +111,8 @@
session.resumeRead();
+ Thread.sleep(100);
+
write(session, "4");
assertEquals('2', read(session));
assertEquals("12", getReceived(session));
@@ -114,6 +120,8 @@
session.resumeWrite();
+ Thread.sleep(100);
+
assertEquals('3', read(session));
assertEquals('4', read(session));
@@ -124,6 +132,8 @@
session.suspendWrite();
+ Thread.sleep(100);
+
write(session, "6");
assertFalse(canRead(session));
assertEquals("12345", getReceived(session));
@@ -132,12 +142,17 @@
session.suspendRead();
session.resumeWrite();
+ Thread.sleep(100);
+
write(session, "7");
assertFalse(canRead(session));
assertEquals("12345", getReceived(session));
assertEquals("1234567", getSent(session));
session.resumeRead();
+
+ Thread.sleep(100);
+
assertEquals('6', read(session));
assertEquals('7', read(session));
Added:
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?rev=579942&view=auto
==============================================================================
---
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
(added)
+++
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Thu Sep 27 01:32:13 2007
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExpiringSessionRecycler;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.AvailablePortFinder;
+
+/**
+ * Tests if datagram sessions are recycled properly.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class DatagramRecyclerTest extends TestCase {
+ private final DatagramAcceptor acceptor = new DatagramAcceptor();
+
+ private final DatagramConnector connector = new DatagramConnector();
+
+ public DatagramRecyclerTest() {
+ }
+
+ public void testDatagramRecycler() throws Exception {
+ int port = AvailablePortFinder.getNextAvailable(1024);
+ ExpiringSessionRecycler recycler = new ExpiringSessionRecycler(1, 1);
+
+ MockHandler acceptorHandler = new MockHandler();
+ MockHandler connectorHandler = new MockHandler();
+
+ acceptor.setLocalAddress(new InetSocketAddress(port));
+ acceptor.setHandler(acceptorHandler);
+ acceptor.setSessionRecycler(recycler);
+ acceptor.bind();
+
+ try {
+ connector.setHandler(connectorHandler);
+ ConnectFuture future = connector.connect(new InetSocketAddress(
+ "localhost", port));
+ future.awaitUninterruptibly();
+
+ // Write whatever to trigger the acceptor.
+ future.getSession().write(ByteBuffer.allocate(1))
+ .awaitUninterruptibly();
+
+ // Close the client-side connection.
+ // This doesn't mean that the acceptor-side connection is also
closed.
+ // The life cycle of the acceptor-side connection is managed by
the recycler.
+ future.getSession().close();
+ future.getSession().getCloseFuture().awaitUninterruptibly();
+ Assert.assertTrue(future.getSession().getCloseFuture().isClosed());
+
+ // Wait until the acceptor-side connection is closed.
+ while (acceptorHandler.session == null) {
+ Thread.yield();
+ }
+
acceptorHandler.session.getCloseFuture().awaitUninterruptibly(3000);
+
+ // Is it closed?
+ Assert.assertTrue(acceptorHandler.session.getCloseFuture()
+ .isClosed());
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals("CROPSECL", connectorHandler.result);
+ Assert.assertEquals("CROPRECL", acceptorHandler.result);
+ } finally {
+ acceptor.unbind();
+ }
+ }
+
+ private class MockHandler extends IoHandlerAdapter {
+ public IoSession session;
+
+ public String result = "";
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause)
+ throws Exception {
+ this.session = session;
+ result += "CA";
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message)
+ throws Exception {
+ this.session = session;
+ result += "RE";
+ }
+
+ @Override
+ public void messageSent(IoSession session, Object message)
+ throws Exception {
+ this.session = session;
+ result += "SE";
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ this.session = session;
+ result += "CL";
+ }
+
+ @Override
+ public void sessionCreated(IoSession session) throws Exception {
+ this.session = session;
+ result += "CR";
+ }
+
+ @Override
+ public void sessionIdle(IoSession session, IdleStatus status)
+ throws Exception {
+ this.session = session;
+ result += "ID";
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ this.session = session;
+ result += "OP";
+ }
+
+ }
+}
Propchange:
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date