Author: jvermillard
Date: Wed May 23 15:35:21 2012
New Revision: 1341900
URL: http://svn.apache.org/viewvc?rev=1341900&view=rev
Log:
* one server => one server socket channel
* added more typing and generics
* start of NIO UDP server
Modified:
mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java Wed May 23
15:35:21 2012
@@ -21,19 +21,19 @@ package org.apache.mina.api;
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.Set;
/**
*
+ * A network serer bound to a local addresse
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
*/
public interface IoServer extends IoService {
/**
- * Returns a {@link Set} of the local addresses which are bound currently.
+ * Returns the local addresses which are bound currently.
*/
- Set<SocketAddress> getLocalAddresses();
+ SocketAddress getBoundAddress();
/**
* Binds to the specified local addresses and start to accept incoming
@@ -42,23 +42,15 @@ public interface IoServer extends IoServ
* @throws IOException
* if failed to bind
*/
- void bind(SocketAddress... localAddress) throws IOException;
+ void bind(SocketAddress localAddress) throws IOException;
/**
- * Unbinds from all local addresses that this service is bound to and stops
+ * Unbinds from the local addresses that this service is bound to and stops
* to accept incoming connections. This method returns silently if no local
* address is bound yet.
* @throws IOException
* if failed to unbind
*/
- void unbindAll() throws IOException;
+ void unbind() throws IOException;
- /**
- * Unbinds from the specified local addresses and stop to accept incoming
- * connections. This method returns silently if the default local addresses
- * are not bound yet.
- * @throws IOException
- * if failed to unbind
- */
- void unbind(SocketAddress... localAddresses) throws IOException;
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
Wed May 23 15:35:21 2012
@@ -21,7 +21,6 @@
package org.apache.mina.service;
import java.io.IOException;
-import java.net.SocketAddress;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
@@ -35,9 +34,12 @@ import org.apache.mina.transport.udp.Abs
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
*/
-public interface SelectorProcessor {
+public interface SelectorProcessor<TCP_SERVER extends
AbstractTcpServer,UDP_SERVER extends AbstractUdpServer> {
-
+ /**
+ * The strategy to use for assigning selectors to newly created
sessions
+ * @param strategy
+ */
void setStrategy(SelectorStrategy<?> strategy);
/**
@@ -48,28 +50,29 @@ public interface SelectorProcessor {
void createSession(IoService service, Object clientSocket) throws
IOException;
/**
- * Bind and start processing this new server TCP address
- * @param server the server for the new address
- * @param address local address to bind
- * @throws IOException exception thrown if any problem occurs while binding
+ * Bind and start processing this newly bound TCP server
+ * @param server the server to be processed
*/
- void bindTcpServer(AbstractTcpServer server, SocketAddress address) throws
IOException;
+ void addServer(TCP_SERVER server);
/**
- * Bind and start processing this new server UDP address
- * @param server the server for the new address
- * @param address local address to bind
- * @throws IOException exception thrown if any problem occurs while binding
+ * Start processing this newly bound UDP
+ * @param server the server to be processed
*/
- void bindUdpServer(AbstractUdpServer server, SocketAddress address) throws
IOException;
+ void addServer(UDP_SERVER server);
/**
- * Stop processing and unbind this server address
- * @param address the local server address to unbind
- * @throws IOException exception thrown if any problem occurs while
unbinding
+ * Stop processing this TCP server
+ * @param server the server to be removed of processing
*/
- void unbind(SocketAddress address) throws IOException;
-
+ void removeServer(TCP_SERVER server);
+
+ /**
+ * Stop processing this UDP server
+ * @param server the server to be processed
+ */
+ void removeServer(UDP_SERVER server);
+
/**
* Schedule a session for flushing, will be called after a {@link
IoSession#write(Object)}.
* @param session the session to flush
Modified:
mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
Wed May 23 15:35:21 2012
@@ -28,10 +28,31 @@ import org.apache.mina.service.AbstractI
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractIoServer extends AbstractIoService implements
IoServer {
+
/**
* Create an new AbstractIoServer instance
*/
protected AbstractIoServer() {
super();
}
+
+ // does the reuse address flag should be positioned
+ private boolean reuseAddress = false;
+
+ /**
+ * Set the reuse address flag on the server socket
+ * @param reuseAddress <code>true</code> to enable
+ */
+ public void setReuseAddress(final boolean reuseAddress) {
+ this.reuseAddress = reuseAddress;
+ }
+
+ /**
+ * Is the reuse address enabled for this server.
+ * @return
+ */
+ public boolean isReuseAddress() {
+ return this.reuseAddress;
+ }
+
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
Wed May 23 15:35:21 2012
@@ -27,22 +27,32 @@ import org.apache.mina.service.server.Ab
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractTcpServer extends AbstractIoServer {
+
+ // the default session configuration
+ private TcpSessionConfig config;
+
/**
* Create an new AbsractTcpServer instance
*/
protected AbstractTcpServer() {
super();
+ this.config = new DefaultTcpSessionConfig();
}
/**
- * Set the reuse address flag on the server socket
- * @param reuseAddress <code>true</code> to enable
+ * {@inheritDoc}
*/
- public abstract void setReuseAddress(boolean reuseAddress);
+ @Override
+ public TcpSessionConfig getSessionConfig() {
+ return this.config;
+ }
/**
- * Is the reuse address enabled for this server.
- * @return
+ * Set the default configuration for created TCP sessions
+ * @param config
*/
- public abstract boolean isReuseAddress();
+ public void setSessionConfig(final TcpSessionConfig config) {
+ this.config = config;
+ }
+
}
\ No newline at end of file
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Wed May 23 15:35:21 2012
@@ -25,7 +25,6 @@ import static org.apache.mina.api.IoSess
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@@ -42,7 +41,6 @@ import java.util.concurrent.locks.Reentr
import javax.net.ssl.SSLException;
import org.apache.mina.api.IdleStatus;
-import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
import org.apache.mina.api.RuntimeIoException;
@@ -55,7 +53,8 @@ import org.apache.mina.session.AbstractI
import org.apache.mina.session.DefaultWriteFuture;
import org.apache.mina.session.SslHelper;
import org.apache.mina.session.WriteRequest;
-import org.apache.mina.transport.udp.AbstractUdpServer;
+import org.apache.mina.transport.tcp.nio.NioTcpServer;
+import org.apache.mina.transport.udp.nio.NioUdpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +64,8 @@ import org.slf4j.LoggerFactory;
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class NioSelectorProcessor implements SelectorProcessor {
+public class NioSelectorProcessor implements
SelectorProcessor<NioTcpServer,NioUdpServer> {
+
/** A logger for this class */
private static final Logger LOGGER = LoggerFactory
.getLogger(NioSelectorProcessor.class);
@@ -80,8 +80,6 @@ public class NioSelectorProcessor implem
private final Map<SocketAddress, ServerSocketChannel>
serverSocketChannels = new ConcurrentHashMap<SocketAddress,
ServerSocketChannel>();
- private final Map<SocketAddress, DatagramChannel> datagramChannels =
new ConcurrentHashMap<SocketAddress, DatagramChannel>();
-
/** Read buffer for all the incoming bytes (default to 64Kb) */
private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
@@ -91,11 +89,18 @@ public class NioSelectorProcessor implem
/** helper for detecting idleing sessions */
private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+ /** A queue containing the servers to bind to this selector */
+ private final Queue<NioTcpServer> tcpServersToadd = new
ConcurrentLinkedQueue<NioTcpServer>();
+
/** A queue containing the servers to bind to this selector */
- private final Queue<Object[]> serversToAdd = new
ConcurrentLinkedQueue<Object[]>();
+ private final Queue<NioUdpServer> udpServersToadd = new
ConcurrentLinkedQueue<NioUdpServer>();
+
+ /** server to remove of the selector */
+ private final Queue<NioTcpServer> tcpServersToRemove = new
ConcurrentLinkedQueue<NioTcpServer>();
/** server to remove of the selector */
- private final Queue<ServerSocketChannel> serversToRemove = new
ConcurrentLinkedQueue<ServerSocketChannel>();
+ private final Queue<NioUdpServer> udpServersToRemove = new
ConcurrentLinkedQueue<NioUdpServer>();
/**
* new session freshly accepted, placed here for being added to the
selector
@@ -123,19 +128,43 @@ public class NioSelectorProcessor implem
this.strategy =
(SelectorStrategy<NioSelectorProcessor>)strategy;
}
+
/**
- * Add a bound server channel for starting accepting new client
connections.
- *
- * @param serverChannel
- */
- private void add(final ServerSocketChannel serverChannel,
- final IoServer server) {
- LOGGER.debug("adding a server channel {} for server {}",
serverChannel,
- server);
- this.serversToAdd.add(new Object[] { serverChannel, server });
+ * {@inheritDoc}
+ */
+ @Override
+ public void addServer(NioTcpServer server) {
+ tcpServersToadd.add(server);
this.wakeupWorker();
}
-
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addServer(NioUdpServer server) {
+ udpServersToadd.add(server);
+ this.wakeupWorker();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeServer(NioTcpServer server) {
+ tcpServersToRemove.add(server);
+ this.wakeupWorker();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeServer(NioUdpServer server) {
+ udpServersToRemove.add(server);
+ this.wakeupWorker();
+ }
+
/**
* Wake the I/O worker thread and if none exists, create a new one
FIXME :
* too much locking there ?
@@ -160,51 +189,6 @@ public class NioSelectorProcessor implem
* {@inheritDoc}
*/
@Override
- public void bindTcpServer(final AbstractTcpServer server,
- final SocketAddress address) throws IOException {
- ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
-
serverSocketChannel.socket().setReuseAddress(server.isReuseAddress());
- serverSocketChannel.socket().bind(address);
- serverSocketChannel.configureBlocking(false);
- this.serverSocketChannels.put(address, serverSocketChannel);
- this.add(serverSocketChannel, server);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void bindUdpServer(AbstractUdpServer server, SocketAddress
address)
- throws IOException {
- DatagramChannel datagramChannel = DatagramChannel.open();
-
datagramChannel.socket().setReuseAddress(server.isReuseAddress());
- datagramChannel.socket().bind(address);
- datagramChannel.configureBlocking(false);
- this.datagramChannels.put(address, datagramChannel);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void unbind(final SocketAddress address) throws IOException {
- ServerSocketChannel channel =
this.serverSocketChannels.get(address);
- channel.socket().close();
- channel.close();
- if (this.serverSocketChannels.remove(address) == null) {
- LOGGER.warn(
- "The server channel for address {} was
already unbound",
- address);
- }
- LOGGER.debug("Removing a server channel {}", channel);
- this.serversToRemove.add(channel);
- this.wakeupWorker();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public void createSession(final IoService service, final Object
clientSocket)
throws SSLException {
LOGGER.debug("create session");
@@ -331,12 +315,12 @@ public class NioSelectorProcessor implem
for (;;) {
try {
// pop server sockets for
removing
- if
(NioSelectorProcessor.this.serversToRemove.size() > 0) {
+ if
(NioSelectorProcessor.this.tcpServersToRemove.size() > 0 ||
NioSelectorProcessor.this.udpServersToRemove.size() > 0) {
this.processServerRemove();
}
// pop new server sockets for
accepting
- if
(NioSelectorProcessor.this.serversToAdd.size() > 0) {
+ if
(NioSelectorProcessor.this.tcpServersToadd.size() > 0 ||
NioSelectorProcessor.this.udpServersToadd.size() > 0) {
this.processServerAdd();
}
@@ -419,37 +403,54 @@ public class NioSelectorProcessor implem
}
}
- /**
- * Handles the servers removal
- */
- private void processServerRemove() {
- while
(!NioSelectorProcessor.this.serversToRemove.isEmpty()) {
- ServerSocketChannel channel =
NioSelectorProcessor.this.serversToRemove
- .poll();
- SelectionKey key =
this.serverKey.remove(channel);
-
- if (key == null) {
- LOGGER.error("The server socket was
already removed of the selector");
- } else {
- LOGGER.debug("Removing the server from
this selector : {}",
- key);
- key.cancel();
- }
- }
- }
/**
* Handles the servers addition
*/
private void processServerAdd() throws IOException {
- while
(!NioSelectorProcessor.this.serversToAdd.isEmpty()) {
- Object[] tmp =
NioSelectorProcessor.this.serversToAdd.poll();
- ServerSocketChannel channel =
(ServerSocketChannel) tmp[0];
- SelectionKey key = channel.register(
+
+ NioTcpServer tcpServer;
+ while ( ( tcpServer = tcpServersToadd.poll() ) != null
) {
+ // register for accept
+ SelectionKey key =
tcpServer.getServerSocketChannel().register(
NioSelectorProcessor.this.selector,
SelectionKey.OP_ACCEPT);
- key.attach(tmp);
- LOGGER.debug("Accepted the server on this
selector : {}", key);
+ key.attach(tcpServer);
+ tcpServer.setAcceptKey(key);
+ LOGGER.debug("registered for accept :
{}",tcpServer);
+ }
+
+ NioUdpServer udpServer;
+ while ( ( udpServer = udpServersToadd.poll() ) != null
) {
+ // register for read
+ SelectionKey key =
udpServer.getDatagramChannel().register(
+
NioSelectorProcessor.this.selector,
+ SelectionKey.OP_READ);
+ key.attach(udpServer);
+ udpServer.setReadKey(key);
+ LOGGER.debug("registered for accept :
{}",udpServer);
+ }
+ }
+
+ /**
+ * Handles the servers removal
+ */
+ private void processServerRemove() {
+ NioTcpServer tcpServer;
+ while ( ( tcpServer = tcpServersToRemove.poll() ) !=
null ) {
+ // find the server key and cancel it
+ SelectionKey key = tcpServer.getAcceptKey();
+ key.cancel();
+ tcpServer.setAcceptKey(null);
+ key.attach(null);
+ }
+ NioUdpServer udpServer;
+ while ( ( udpServer = udpServersToRemove.poll() ) !=
null ) {
+ // find the server key and cancel it
+ SelectionKey key = udpServer.getReadKey();
+ key.cancel();
+ udpServer.setReadKey(null);
+ key.attach(null);
}
}
@@ -510,11 +511,10 @@ public class NioSelectorProcessor implem
*/
private void processAccept(final SelectionKey key) throws
IOException {
LOGGER.debug("acceptable new client {}", key);
- ServerSocketChannel serverSocket =
(ServerSocketChannel) ((Object[]) key
- .attachment())[0];
- IoServer server = (IoServer) (((Object[])
key.attachment())[1]);
+ NioTcpServer server = (NioTcpServer) key.attachment();
+
// accepted connection
- SocketChannel newClientChannel = serverSocket.accept();
+ SocketChannel newClientChannel =
server.getServerSocketChannel().accept();
LOGGER.debug("client accepted");
// and give it's to the strategy
NioSelectorProcessor.this.strategy.getSelectorForNewSession(
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
Wed May 23 15:35:21 2012
@@ -21,16 +21,12 @@ package org.apache.mina.transport.tcp.ni
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
import org.apache.mina.service.SelectorStrategy;
import org.apache.mina.transport.tcp.AbstractTcpServer;
-import org.apache.mina.transport.tcp.DefaultTcpSessionConfig;
import org.apache.mina.transport.tcp.NioSelectorProcessor;
-import org.apache.mina.transport.tcp.TcpSessionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,117 +37,105 @@ import org.slf4j.LoggerFactory;
*/
public class NioTcpServer extends AbstractTcpServer {
static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
-
- // list of bound addresses
- private final Map<SocketAddress /* bound address */,NioSelectorProcessor
/* used processor */> addresses = new HashMap<SocketAddress,
NioSelectorProcessor>();
// the strategy for dispatching servers and client to selector threads.
private final SelectorStrategy<NioSelectorProcessor> strategy;
- // the default session confinguration
- private TcpSessionConfig config;
-
- private boolean reuseAddress = false;
-
+ // the bound local address
+ private SocketAddress address = null;
+
+ private NioSelectorProcessor acceptProcessor = null;
+
+ // the key used for selecting accept event
+ private SelectionKey acceptKey = null;
+
+ // the server socket for accepting clients
+ private ServerSocketChannel serverChannel = null;
+
public NioTcpServer(final SelectorStrategy<NioSelectorProcessor> strategy)
{
super();
this.strategy = strategy;
- this.config = new DefaultTcpSessionConfig();
}
-
+
/**
- * {@inheritDoc}
+ * Get the inner Server socket for accepting new client connections
+ * @return
*/
- @Override
- public TcpSessionConfig getSessionConfig() {
- return this.config;
+ public ServerSocketChannel getServerSocketChannel() {
+ return this.serverChannel;
}
- public void setSessionConfig(final TcpSessionConfig config) {
- this.config = config;
+ public void setServerSocketChannel(ServerSocketChannel serverChannel) {
+ this.serverChannel = serverChannel;
}
-
/**
* {@inheritDoc}
*/
@Override
- public void setReuseAddress(final boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isReuseAddress() {
- return this.reuseAddress;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized void bind(final SocketAddress... localAddress) throws
IOException {
+ public synchronized void bind(final SocketAddress localAddress) throws
IOException {
if (localAddress == null) {
// We should at least have one address to bind on
throw new IllegalArgumentException("LocalAdress cannot be null");
}
- for (SocketAddress address : localAddress) {
- // check if the address is already bound
- synchronized (this) {
- if (this.addresses.containsKey(address)) {
- throw new IOException("address " + address + " already
bound");
- }
+ // check if the address is already bound
+ if (this.address != null) {
+ throw new IOException("address " + address + " already bound");
+ }
- LOG.info("binding address {}", address);
- NioSelectorProcessor processor =
this.strategy.getSelectorForBindNewAddress();
-
- this.addresses.put(address,processor);
+ LOG.info("binding address {}", localAddress);
+ this.address = localAddress;
+
+ serverChannel = ServerSocketChannel.open();
+ serverChannel.socket().setReuseAddress(isReuseAddress());
+ serverChannel.socket().bind(address);
+ serverChannel.configureBlocking(false);
+
+ acceptProcessor = this.strategy.getSelectorForBindNewAddress();
+
+ acceptProcessor.addServer(this);
- processor.bindTcpServer(this, address);
-
- if (this.addresses.size() == 1) {
- // it's the first address bound, let's fire the event
- this.fireServiceActivated();
- }
- }
- }
+ // it's the first address bound, let's fire the event
+ this.fireServiceActivated();
}
/**
* {@inheritDoc}
*/
@Override
- public synchronized Set<SocketAddress> getLocalAddresses() {
- return new HashSet<SocketAddress>(addresses.keySet());
+ public SocketAddress getBoundAddress() {
+ return address;
}
/**
* {@inheritDoc}
*/
@Override
- public synchronized void unbind(final SocketAddress... localAddresses)
throws IOException {
- for (SocketAddress socketAddress : localAddresses) {
- LOG.info("unbinding {}", socketAddress);
- addresses.get(socketAddress).unbind(socketAddress);
- this.addresses.remove(socketAddress);
- if (this.addresses.isEmpty()) {
- this.fireServiceInactivated();
- }
+ public synchronized void unbind() throws IOException {
+ LOG.info("unbinding {}", address);
+ if( this.address == null) {
+ throw new IllegalStateException("server not bound");
}
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized void unbindAll() throws IOException {
- LOG.info("unbinding all");
- for(SocketAddress address : addresses.keySet()) {
- LOG.debug("unbinding {}", address);
- addresses.remove(address).unbind(address);
- }
- }
-
+ serverChannel.socket().close();
+ serverChannel.close();
+ acceptProcessor.removeServer(this);
+
+ this.address = null;
+ this.fireServiceInactivated();
+ }
+
+ /**
+ * @return the acceptKey
+ */
+ public SelectionKey getAcceptKey() {
+ return acceptKey;
+ }
+
+ /**
+ * @param acceptKey the acceptKey to set
+ */
+ public void setAcceptKey(SelectionKey acceptKey) {
+ this.acceptKey = acceptKey;
+ }
+
}
\ No newline at end of file
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
Wed May 23 15:35:21 2012
@@ -43,16 +43,4 @@ public abstract class AbstractUdpServer
public void initSecured(IoSession session) throws SSLException {
throw new RuntimeException("SSL is not supported for UDP");
}
-
- /**
- * Set the reuse address flag on the server socket
- * @param reuseAddress <code>true</code> to enable
- */
- public abstract void setReuseAddress(boolean reuseAddress);
-
- /**
- * Is the reuse address enabled for this server.
- * @return
- */
- public abstract boolean isReuseAddress();
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
Wed May 23 15:35:21 2012
@@ -21,9 +21,8 @@ package org.apache.mina.transport.udp.ni
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
import org.apache.mina.api.IoSessionConfig;
import org.apache.mina.service.SelectorStrategy;
@@ -41,23 +40,40 @@ public class NioUdpServer extends Abstra
static final Logger LOG = LoggerFactory.getLogger(NioUdpServer.class);
- // list of bound addresses
- private final Set<SocketAddress> addresses =
Collections.synchronizedSet(new HashSet<SocketAddress>());
-
+ // the bound local address
+ private SocketAddress address = null;
+
// the strategy for dispatching servers and client to selector threads.
- private final SelectorStrategy strategy;
-
- private boolean reuseAddress = false;
+ private final SelectorStrategy<NioSelectorProcessor> strategy;
+ // the processor used for read and write this server
+ private NioSelectorProcessor processor;
+
+ // the inner channel for read/write UDP datagrams
+ private DatagramChannel datagramChannel = null;
+
+ // the key used for selecting read event
+ private SelectionKey readKey = null;
+
/**
* Create a new instance of NioUdpServer
*/
- public NioUdpServer(final SelectorStrategy strategy) {
+ public NioUdpServer(final SelectorStrategy<NioSelectorProcessor> strategy)
{
super();
this.strategy = strategy;
}
/**
+ * Get the inner datagram channel for read and write operations.
+ * To be called by the {@link NioSelectorProcessor}
+ *
+ * @return the datagram channel bound to this {@link NioUdpServer}.
+ */
+ public DatagramChannel getDatagramChannel() {
+ return datagramChannel;
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -66,76 +82,77 @@ public class NioUdpServer extends Abstra
return null;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public Set<SocketAddress> getLocalAddresses() {
- // TODO Auto-generated method stub
- return null;
- }
-
+
/**
* {@inheritDoc}
*/
@Override
- public void bind(final SocketAddress... localAddress) throws IOException {
+ public SocketAddress getBoundAddress() {
+ return address;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bind(SocketAddress localAddress) throws IOException {
if (localAddress == null) {
// We should at least have one address to bind on
throw new IllegalArgumentException("LocalAdress cannot be null");
}
- for (SocketAddress address : localAddress) {
- // check if the address is already bound
- synchronized (this) {
- if (this.addresses.contains(address)) {
- throw new IOException("address " + address + " already
bound");
- }
-
- LOG.debug("binding address {}", address);
-
- this.addresses.add(address);
- NioSelectorProcessor processor = (NioSelectorProcessor)
this.strategy.getSelectorForBindNewAddress();
- processor.bindUdpServer(this, address);
- if (this.addresses.size() == 1) {
- // it's the first address bound, let's fire the event
- this.fireServiceActivated();
- }
- }
+ // check if the address is already bound
+ if (this.address != null) {
+ throw new IOException("address " + address + " already bound");
}
+ address = localAddress;
+
+ LOG.info("binding address {}", localAddress);
+
+ datagramChannel = DatagramChannel.open();
+
+ datagramChannel.socket().setReuseAddress(isReuseAddress());
+ datagramChannel.socket().bind(address);
+ datagramChannel.configureBlocking(false);
+
+ processor = this.strategy.getSelectorForBindNewAddress();
+
+ processor.addServer(this);
+
+ // it's the first address bound, let's fire the event
+ this.fireServiceActivated();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void unbind() throws IOException {
+ LOG.info("unbinding {}", address);
+ if( this.address == null) {
+ throw new IllegalStateException("server not bound");
+ }
+ datagramChannel.socket().close();
+ datagramChannel.close();
+
+ processor.removeServer(this);
+
+ this.address = null;
+ this.fireServiceInactivated();
}
/**
- * {@inheritDoc}
- */
- @Override
- public void unbindAll() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void unbind(SocketAddress... localAddresses) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- /**
- * {@inheritDoc}
+ * @return the readKey
*/
- @Override
- public void setReuseAddress(boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
+ public SelectionKey getReadKey() {
+ return readKey;
}
/**
- * {@inheritDoc}
+ * @param readKey the readKey to set
*/
- @Override
- public boolean isReuseAddress() {
- return this.reuseAddress;
+ public void setReadKey(SelectionKey readKey) {
+ this.readKey = readKey;
}
-}
+
+}
\ No newline at end of file
Modified:
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
(original)
+++
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
Wed May 23 15:35:21 2012
@@ -131,7 +131,7 @@ public class NioEchoServer {
LOG.debug("Running the server for 25 sec");
Thread.sleep(25000);
LOG.debug("Unbinding the TCP port");
- acceptor.unbind(address);
+ acceptor.unbind();
} catch (IOException e) {
LOG.error("I/O exception", e);
} catch (InterruptedException e) {
Modified:
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
(original)
+++
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
Wed May 23 15:35:21 2012
@@ -62,7 +62,7 @@ public class HttpTest {
// run for 20 seconds
Thread.sleep(20000);
- acceptor.unbindAll();
+ acceptor.unbind();
}
Modified:
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
(original)
+++
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
Wed May 23 15:35:21 2012
@@ -65,7 +65,7 @@ public class HttpsTest {
// run for 20 seconds
Thread.sleep(20000);
- acceptor.unbindAll();
+ acceptor.unbind();
}
Modified:
mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java?rev=1341900&r1=1341899&r2=1341900&view=diff
==============================================================================
---
mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
(original)
+++
mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
Wed May 23 15:35:21 2012
@@ -62,7 +62,7 @@ public class LdapTest {
// run for 20 seconds
Thread.sleep(200000);
- acceptor.unbindAll();
+ acceptor.unbind();
}