Updated Branches: refs/heads/trunk d8de7417c -> ea688dc8d
NIO UDP client (WIP) Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/ea688dc8 Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/ea688dc8 Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/ea688dc8 Branch: refs/heads/trunk Commit: ea688dc8dde370b880a6500639d803c0491e6b3b Parents: d8de741 Author: jvermillard <[email protected]> Authored: Sun May 19 13:10:42 2013 +0200 Committer: jvermillard <[email protected]> Committed: Sun May 19 13:10:42 2013 +0200 ---------------------------------------------------------------------- .../org/apache/mina/service/AbstractIoService.java | 5 + .../mina/service/server/AbstractIoServer.java | 2 - .../apache/mina/transport/nio/NioSelectorLoop.java | 7 +- .../mina/transport/nio/tcp/NioTcpSession.java | 8 +- .../mina/transport/nio/udp/NioUdpClient.java | 90 +++++++++--- .../mina/transport/nio/udp/NioUdpSession.java | 118 ++++++++++----- .../mina/transport/tcp/AbstractTcpClient.java | 5 +- .../mina/transport/udp/AbstractUdpClient.java | 9 + .../transport/tcp/NioTcpClientFilterEventTest.java | 8 +- 9 files changed, 176 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/service/AbstractIoService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/service/AbstractIoService.java b/core/src/main/java/org/apache/mina/service/AbstractIoService.java index 2046d4a..7099b3d 100644 --- a/core/src/main/java/org/apache/mina/service/AbstractIoService.java +++ b/core/src/main/java/org/apache/mina/service/AbstractIoService.java @@ -26,6 +26,7 @@ import org.apache.mina.api.IoFilter; import org.apache.mina.api.IoHandler; import org.apache.mina.api.IoService; import org.apache.mina.api.IoSession; +import org.apache.mina.api.IoSessionConfig; import org.apache.mina.service.executor.IoHandlerExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class AbstractIoService implements IoService { + /** A logger for this class */ static final Logger LOG = LoggerFactory.getLogger(AbstractIoService.class); @@ -45,6 +47,9 @@ public abstract class AbstractIoService implements IoService { /** The placeholder of managed open sessions */ private final Map<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>(); + /** the default session configuration */ + protected IoSessionConfig config; + /** The high level business logic */ private IoHandler handler; http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java b/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java index 863e49c..9b2bbf2 100644 --- a/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java +++ b/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java @@ -30,8 +30,6 @@ import org.apache.mina.service.executor.IoHandlerExecutor; * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class AbstractIoServer extends AbstractIoService implements IoServer { - /** the default session configuration */ - protected IoSessionConfig config; /** * Create an new AbstractIoServer instance http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java b/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java index 51bd5dd..d3b7ab1 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java +++ b/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java @@ -134,7 +134,7 @@ public class NioSelectorLoop implements SelectorLoop { public void modifyRegistration(boolean accept, boolean read, boolean write, final SelectorListener listener, SelectableChannel channel, boolean wakeup) { logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}, channel : {}", new Object[] { - listener, accept, read, write, channel }); + listener, accept, read, write, channel }); final SelectionKey key = channel.keyFor(selector); if (key == null) { @@ -207,7 +207,6 @@ public class NioSelectorLoop implements SelectorLoop { while (it.hasNext()) { final SelectionKey key = it.next(); final SelectorListener listener = (SelectorListener) key.attachment(); - logger.debug("key : {}", key); int ops = key.readyOps(); boolean isAcceptable = (ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isConnectable = (ops & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT; @@ -215,7 +214,8 @@ public class NioSelectorLoop implements SelectorLoop { boolean isWritable = (ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE; listener.ready(isAcceptable, isConnectable, isReadable, isReadable ? readBuffer : null, isWritable); - // if you don't remove the event of the set, the selector will present you this event again and + // if you don't remove the event of the set, the selector will present you this event again + // and // again logger.debug("remove"); it.remove(); @@ -270,6 +270,7 @@ public class NioSelectorLoop implements SelectorLoop { return callback; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java index 9f863f0..9e15885 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java @@ -69,7 +69,7 @@ public class NioTcpSession extends AbstractIoSession implements SelectorListener /** The size of the buffer configured in the socket to send data */ private int sendBufferSize; - /* No qualifier*/NioTcpSession(final IoService service, final SocketChannel channel, + /* No qualifier */NioTcpSession(final IoService service, final SocketChannel channel, final SelectorLoop selectorLoop, final IdleChecker idleChecker) { super(service, channel, idleChecker); this.selectorLoop = selectorLoop; @@ -340,8 +340,10 @@ public class NioTcpSession extends AbstractIoSession implements SelectorListener @Override public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer, final boolean write) { - LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept, - connect, read, write }); + if (LOG.isDebugEnabled()) { + LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept, + connect, read, write }); + } if (connect) { try { http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java index 8ccfd98..9ab4338 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java +++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java @@ -23,23 +23,31 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.DatagramChannel; +import org.apache.mina.api.IdleStatus; import org.apache.mina.api.IoFuture; import org.apache.mina.api.IoSession; -import org.apache.mina.api.IoSessionConfig; import org.apache.mina.service.executor.IoHandlerExecutor; +import org.apache.mina.service.idlechecker.IndexedIdleChecker; +import org.apache.mina.transport.nio.ConnectFuture; +import org.apache.mina.transport.nio.FixedSelectorLoopPool; import org.apache.mina.transport.nio.NioSelectorLoop; -import org.apache.mina.transport.nio.SelectorLoop; +import org.apache.mina.transport.nio.SelectorLoopPool; import org.apache.mina.transport.udp.AbstractUdpClient; +import org.apache.mina.transport.udp.UdpSessionConfig; +import org.apache.mina.util.Assert; /** - * TODO + * This class implements a UDP NIO based client. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public class NioUdpClient extends AbstractUdpClient { - /** the SelectorLoop for connecting the sessions */ + + /** the SelectorLoop for handling read/write session events */ // This is final, so that we know if it's not initialized - private final SelectorLoop connectSelectorLoop; + private final SelectorLoopPool readWriteSelectorPool; + + private final IndexedIdleChecker idleChecker = new IndexedIdleChecker(); /** * Create a new instance of NioUdpClient @@ -53,30 +61,76 @@ public class NioUdpClient extends AbstractUdpClient { */ public NioUdpClient(IoHandlerExecutor ioHandlerExecutor) { super(ioHandlerExecutor); - connectSelectorLoop = new NioSelectorLoop("connect", 0); - } - - @Override - public IoSessionConfig getSessionConfig() { - // TODO Auto-generated method stub - return null; + readWriteSelectorPool = new FixedSelectorLoopPool("Client", 2); + idleChecker.start(); } + /** + * {@inheritDoc} + */ @Override public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException { + Assert.assertNotNull(remoteAddress, "remoteAddress"); + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(false); + + UdpSessionConfig config = getSessionConfig(); + + NioSelectorLoop loop = (NioSelectorLoop) readWriteSelectorPool.getSelectorLoop(); + + final NioUdpSession session = new NioUdpSession(this, idleChecker, ch, null, remoteAddress, loop); + + session.setConnected(); + + // apply idle configuration + session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE)); + session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE, + config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE)); + + // Manage the Idle status + idleChecker.sessionRead(session, System.currentTimeMillis()); + idleChecker.sessionWritten(session, System.currentTimeMillis()); + + // apply the default service socket configuration - if (remoteAddress != null) { - ch.socket().bind(remoteAddress); - ch.connect(remoteAddress); + Boolean reuseAddress = config.isReuseAddress(); + + if (reuseAddress != null) { + session.getConfig().setReuseAddress(reuseAddress); + } + + Integer readBufferSize = config.getReadBufferSize(); + + if (readBufferSize != null) { + session.getConfig().setReadBufferSize(readBufferSize); + } + + Integer sendBufferSize = config.getSendBufferSize(); + + if (sendBufferSize != null) { + session.getConfig().setSendBufferSize(sendBufferSize); } - return null; + Integer trafficClass = config.getTrafficClass(); + + if (trafficClass != null) { + session.getConfig().setTrafficClass(trafficClass); + } + + loop.register(false, false, true, false, session, ch, null); + + ConnectFuture cf = new ConnectFuture(); + cf.complete(session); + return cf; } + /** + * {@inheritDoc} + */ @Override public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) { - // TODO Auto-generated method stub - return null; + throw new IllegalStateException("not supported for UDP"); } + } http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java index 6f099bc..8b703a9 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java @@ -23,14 +23,15 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; import org.apache.mina.api.IoFuture; import org.apache.mina.api.IoService; import org.apache.mina.service.idlechecker.IdleChecker; import org.apache.mina.session.AbstractIoSession; import org.apache.mina.session.WriteRequest; +import org.apache.mina.transport.nio.NioSelectorLoop; import org.apache.mina.transport.nio.SelectorListener; +import org.apache.mina.transport.nio.SelectorLoop; import org.apache.mina.transport.udp.UdpSessionConfig; import org.apache.mina.util.AbstractIoFuture; import org.slf4j.Logger; @@ -49,6 +50,11 @@ public class NioUdpSession extends AbstractIoSession implements SelectorListener private final SocketAddress remoteAddress; + /** + * The selector loop in charge of generating read/write events for this session Used only for UDP client session. + */ + private SelectorLoop selectorLoop = null; + /** the socket configuration */ private final UdpSessionConfig configuration; @@ -66,11 +72,9 @@ public class NioUdpSession extends AbstractIoSession implements SelectorListener }; /** - * @param service - * @param writeProcessor - * @param idleChecker + * For server handled UDP sessions */ - /* No qualifier*/NioUdpSession(IoService service, IdleChecker idleChecker, DatagramChannel datagramChannel, + /* No qualifier */NioUdpSession(IoService service, IdleChecker idleChecker, DatagramChannel datagramChannel, SocketAddress localAddress, SocketAddress remoteAddress) { super(service, datagramChannel, idleChecker); this.localAddress = localAddress; @@ -80,11 +84,34 @@ public class NioUdpSession extends AbstractIoSession implements SelectorListener } /** + * For client handled UDP sessions + */ + /* No qualifier */NioUdpSession(IoService service, IdleChecker idleChecker, DatagramChannel datagramChannel, + SocketAddress localAddress, SocketAddress remoteAddress, NioSelectorLoop selectorLoop) { + super(service, datagramChannel, idleChecker); + this.selectorLoop = selectorLoop; + this.localAddress = localAddress; + this.remoteAddress = remoteAddress; + this.config = service.getSessionConfig(); + this.configuration = (UdpSessionConfig) this.config; + } + + /** * {@inheritDoc} */ @Override protected void channelClose() { - // No inner socket to close for UDP + LOG.debug("channelClose"); + // No inner socket to close for UDP server, but some for UDP client + if (channel != null) { + try { + selectorLoop.unregister(this, channel); + channel.close(); + } catch (final IOException e) { + LOG.error("Exception while closing the channel : ", e); + processException(e); + } + } } /** @@ -114,29 +141,14 @@ public class NioUdpSession extends AbstractIoSession implements SelectorListener /** * {@inheritDoc} */ - @Override - public IoFuture<Void> close(boolean immediately) { - switch (state) { - case CREATED: - LOG.error("Session {} not opened", this); - throw new IllegalStateException("cannot close an not opened session"); - case CONNECTED: - case CLOSING: - if (immediately) { - state = SessionState.CLOSED; - } else { - // we wait for the write queue to be depleted - state = SessionState.CLOSING; - } - break; - case CLOSED: - LOG.warn("Already closed session {}", this); - break; - default: - throw new IllegalStateException("not implemented session state : " + state); - } - return closeFuture; - } + /* + * @Override public IoFuture<Void> close(boolean immediately) { switch (state) { case CREATED: + * LOG.error("Session {} not opened", this); throw new IllegalStateException("cannot close an not opened session"); + * case CONNECTED: case CLOSING: if (immediately) { state = SessionState.CLOSED; } else { // we wait for the write + * queue to be depleted state = SessionState.CLOSING; } break; case CLOSED: LOG.warn("Already closed session {}", + * this); break; default: throw new IllegalStateException("not implemented session state : " + state); } return + * closeFuture; } + */ /** * {@inheritDoc} @@ -260,29 +272,55 @@ public class NioUdpSession extends AbstractIoSession implements SelectorListener return message; } - void setSelectionKey(SelectionKey key) { - //this.selectionKey = key; - } - /** * Set this session status as connected. To be called by the processor selecting/polling this session. */ void setConnected() { if (!isCreated()) { - throw new RuntimeException("Trying to open a non created session"); + throw new IllegalStateException("Trying to open a non created session"); } state = SessionState.CONNECTED; - - /*if (connectFuture != null) { - connectFuture.complete(this); - connectFuture = null; // free some memory - }*/ - processSessionOpen(); } @Override public void ready(boolean accept, boolean connect, boolean read, ByteBuffer readBuffer, boolean write) { + if (LOG.isDebugEnabled()) { + LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept, + connect, read, write }); + } + + if (read) { + LOG.debug("readable datagram for UDP service : {}", this); + + // Read everything we can up to the buffer size + try { + ((DatagramChannel) channel).receive(readBuffer); + readBuffer.flip(); + + int readbytes = readBuffer.remaining(); + LOG.debug("read {} bytes", readbytes); + if (readbytes <= 0) { + // session closed by the remote peer + LOG.debug("session closed by the remote peer"); + close(true); + } else { + receivedDatagram(readBuffer); + } + } catch (IOException e) { + processException(e); + } + + } + + if (write) { + // no much to do here + } + if (accept) { + throw new IllegalStateException("accept event should never occur on NioUdpSession"); + } + } + } http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java b/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java index 794718f..6176129 100644 --- a/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java +++ b/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java @@ -29,9 +29,6 @@ import org.apache.mina.service.executor.IoHandlerExecutor; */ public abstract class AbstractTcpClient extends AbstractIoClient { - /** the default session configuration */ - private TcpSessionConfig config; - /** the connection timeout in milliseconds, after that delay the connection to remote server should fail. */ private int connectTimeoutInMillis = 10000; @@ -48,7 +45,7 @@ public abstract class AbstractTcpClient extends AbstractIoClient { */ @Override public TcpSessionConfig getSessionConfig() { - return config; + return (TcpSessionConfig) config; } /** http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java b/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java index 22541f5..ec57016 100644 --- a/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java +++ b/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java @@ -39,6 +39,7 @@ public abstract class AbstractUdpClient extends AbstractIoClient { */ protected AbstractUdpClient(IoHandlerExecutor ioHandlerExecutor) { super(ioHandlerExecutor); + this.config = new DefaultUdpSessionConfig(); } /** @@ -58,4 +59,12 @@ public abstract class AbstractUdpClient extends AbstractIoClient { * succeeds or fails. */ public abstract IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress); + + /** + * {@inheritDoc} + */ + @Override + public UdpSessionConfig getSessionConfig() { + return (UdpSessionConfig) config; + } } http://git-wip-us.apache.org/repos/asf/mina/blob/ea688dc8/core/src/test/java/org/apache/mina/transport/tcp/NioTcpClientFilterEventTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/mina/transport/tcp/NioTcpClientFilterEventTest.java b/core/src/test/java/org/apache/mina/transport/tcp/NioTcpClientFilterEventTest.java index 053a823..fa00b9c 100644 --- a/core/src/test/java/org/apache/mina/transport/tcp/NioTcpClientFilterEventTest.java +++ b/core/src/test/java/org/apache/mina/transport/tcp/NioTcpClientFilterEventTest.java @@ -19,10 +19,7 @@ */ package org.apache.mina.transport.tcp; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.IOException; import java.net.InetSocketAddress; @@ -42,13 +39,12 @@ import org.apache.mina.filterchain.ReadFilterChainController; import org.apache.mina.filterchain.WriteFilterChainController; import org.apache.mina.session.WriteRequest; import org.apache.mina.transport.nio.tcp.NioTcpClient; -import org.apache.mina.transport.nio.tcp.NioTcpServer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class test the event dispatching of {@link NioTcpServer}. + * This class test the event dispatching of {@link NioTcpClient}. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */
