Author: trustin
Date: Fri Nov 9 03:36:01 2007
New Revision: 593496
URL: http://svn.apache.org/viewvc?rev=593496&view=rev
Log:
Resolved issue: DIRMINA-341 (Allow binding multiple SocketAddresses per
IoAcceptor.)
* All three acceptor implementations now binds to multiple addresses.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java?rev=593496&r1=593495&r2=593496&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
Fri Nov 9 03:36:01 2007
@@ -24,8 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
/**
@@ -37,9 +36,9 @@
public abstract class AbstractIoAcceptor
extends AbstractIoService implements IoAcceptor {
- private final Set<SocketAddress> localAddresses = new
HashSet<SocketAddress>();
- private final Set<SocketAddress> unmodifiableLocalAddresses =
- Collections.unmodifiableSet(localAddresses);
+ private final List<SocketAddress> localAddresses = new
ArrayList<SocketAddress>();
+ private final List<SocketAddress> unmodifiableLocalAddresses =
+ Collections.unmodifiableList(localAddresses);
private boolean disconnectOnUnbind = true;
private boolean bound;
@@ -66,17 +65,24 @@
setLocalAddresses(localAddress);
}
- public Set<SocketAddress> getLocalAddresses() {
+ public List<SocketAddress> getLocalAddresses() {
return unmodifiableLocalAddresses;
}
- public void setLocalAddresses(Set<SocketAddress> localAddresses) {
+ public void setLocalAddresses(Iterable<SocketAddress> localAddresses) {
if (localAddresses == null) {
throw new NullPointerException("localAddresses");
}
- setLocalAddresses(
- localAddresses.toArray(new
SocketAddress[localAddresses.size()]));
+ List<SocketAddress> list = new ArrayList<SocketAddress>();
+ for (SocketAddress a: localAddresses) {
+ if (a == null) {
+ continue;
+ }
+ list.add(a);
+ }
+
+ setLocalAddresses(list.toArray(new SocketAddress[list.size()]));
}
public void setLocalAddresses(SocketAddress... localAddresses) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=593496&r1=593495&r2=593496&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Fri
Nov 9 03:36:01 2007
@@ -84,7 +84,7 @@
throw new UnsupportedOperationException();
}
- public IoSession newSession(SocketAddress remoteAddress) {
+ public IoSession newSession(SocketAddress remoteAddress,
SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java?rev=593496&r1=593495&r2=593496&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java Fri
Nov 9 03:36:01 2007
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.Set;
+import java.util.List;
/**
* Accepts incoming connection, communicates with clients, and fires events to
@@ -50,11 +50,11 @@
SocketAddress getLocalAddress();
/**
- * Returns a [EMAIL PROTECTED] Set} of the local address to bind.
+ * Returns a [EMAIL PROTECTED] List} of the local addresses to bind.
*
* @throws IllegalStateException if this service is already running.
*/
- Set<SocketAddress> getLocalAddresses();
+ List<SocketAddress> getLocalAddresses();
/**
* Sets the local address to bind.
@@ -79,7 +79,7 @@
*
* @throws IllegalStateException if this service is already running.
*/
- void setLocalAddresses(Set<SocketAddress> localAddresses);
+ void setLocalAddresses(Iterable<SocketAddress> localAddresses);
/**
* Returns <tt>true</tt> if and only if all clients are disconnected
@@ -108,9 +108,9 @@
void unbind();
/**
- * (Optional) Returns an [EMAIL PROTECTED] IoSession} that is bound to the
current
- * local address and the specified <tt>remoteAddress</tt> which reuses
- * the local address that is already bound by this service.
+ * (Optional) Returns an [EMAIL PROTECTED] IoSession} that is bound to the
specified
+ * <tt>localAddress</tt> and the specified <tt>remoteAddress</tt> which
+ * reuses the local address that is already bound by this service.
* <p>
* This operation is optional. Please throw [EMAIL PROTECTED]
UnsupportedOperationException}
* if the transport type doesn't support this operation. This operation is
@@ -118,6 +118,8 @@
*
* @throws UnsupportedOperationException if this operation is not supported
* @throws IllegalStateException if this service is not running.
+ * @throws IllegalArgumentException if this service is not bound to the
+ * specified <tt>localAddress</tt>.
*/
- IoSession newSession(SocketAddress remoteAddress);
+ IoSession newSession(SocketAddress remoteAddress, SocketAddress
localAddress);
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=593496&r1=593495&r2=593496&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
Fri Nov 9 03:36:01 2007
@@ -26,7 +26,12 @@
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -71,10 +76,11 @@
private final Queue<ServiceOperationFuture> registerQueue = new
ConcurrentLinkedQueue<ServiceOperationFuture>();
private final Queue<ServiceOperationFuture> cancelQueue = new
ConcurrentLinkedQueue<ServiceOperationFuture>();
private final Queue<NioDatagramSession> flushingSessions = new
ConcurrentLinkedQueue<NioDatagramSession>();
+ private final Map<SocketAddress, DatagramChannel> serverChannels =
+ Collections.synchronizedMap(new HashMap<SocketAddress,
DatagramChannel>());
private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
- private DatagramChannel channel;
private Worker worker;
private long lastIdleCheckTime;
@@ -144,7 +150,11 @@
throw request.getException();
}
- setLocalAddress(channel.socket().getLocalSocketAddress());
+ Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+ for (DatagramChannel c: serverChannels.values()) {
+ newLocalAddresses.add(c.socket().getLocalSocketAddress());
+ }
+ setLocalAddresses(newLocalAddresses);
}
@Override
@@ -162,7 +172,7 @@
}
}
- public IoSession newSession(SocketAddress remoteAddress) {
+ public IoSession newSession(SocketAddress remoteAddress, SocketAddress
localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
@@ -173,19 +183,23 @@
"Can't create a session from a unbound service.");
}
- return newSessionWithoutLock(remoteAddress);
+ return newSessionWithoutLock(remoteAddress, localAddress);
}
}
- private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
+ private IoSession newSessionWithoutLock(
+ SocketAddress remoteAddress, SocketAddress localAddress) {
Selector selector = this.selector;
- DatagramChannel ch = this.channel;
+ DatagramChannel ch = serverChannels.get(localAddress);
+ if (ch == null) {
+ throw new IllegalArgumentException("Unknown local address: " +
localAddress);
+ }
SelectionKey key = ch.keyFor(selector);
IoSession session;
IoSessionRecycler sessionRecycler = getSessionRecycler();
synchronized (sessionRecycler) {
- session = sessionRecycler.recycle(getLocalAddress(),
remoteAddress);
+ session = sessionRecycler.recycle(localAddress, remoteAddress);
if (session != null) {
return session;
}
@@ -365,7 +379,8 @@
SocketAddress remoteAddress = channel.receive(readBuf.buf());
if (remoteAddress != null) {
- NioDatagramSession session = (NioDatagramSession)
newSessionWithoutLock(remoteAddress);
+ IoSession session = newSessionWithoutLock(
+ remoteAddress, channel.socket().getLocalSocketAddress());
readBuf.flip();
@@ -469,35 +484,61 @@
break;
}
- DatagramChannel ch = null;
- try {
- ch = DatagramChannel.open();
- DatagramSessionConfig cfg = getSessionConfig();
- ch.socket().setReuseAddress(cfg.isReuseAddress());
- ch.socket().setBroadcast(cfg.isBroadcast());
- ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
- ch.socket().setSendBufferSize(cfg.getSendBufferSize());
+ Map<SocketAddress, DatagramChannel> newServerChannels =
+ new HashMap<SocketAddress, DatagramChannel>();
+ List<SocketAddress> localAddresses = getLocalAddresses();
- if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
- ch.socket().setTrafficClass(cfg.getTrafficClass());
+ try {
+ for (SocketAddress a: localAddresses) {
+ DatagramChannel c = null;
+ boolean success = false;
+ try {
+ c = DatagramChannel.open();
+ DatagramSessionConfig cfg = getSessionConfig();
+ c.socket().setReuseAddress(cfg.isReuseAddress());
+ c.socket().setBroadcast(cfg.isBroadcast());
+
c.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+ c.socket().setSendBufferSize(cfg.getSendBufferSize());
+
+ if (c.socket().getTrafficClass() !=
cfg.getTrafficClass()) {
+ c.socket().setTrafficClass(cfg.getTrafficClass());
+ }
+
+ c.configureBlocking(false);
+ c.socket().bind(a);
+ c.register(selector, SelectionKey.OP_READ, req);
+ success = true;
+ } finally {
+ if (c != null && !success) {
+ try {
+ c.disconnect();
+ c.close();
+ } catch (Throwable e) {
+
ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ newServerChannels.put(c.socket().getLocalSocketAddress(),
c);
}
-
- ch.configureBlocking(false);
- ch.socket().bind(getLocalAddress());
- ch.register(selector, SelectionKey.OP_READ, req);
- this.channel = ch;
-
+
+ serverChannels.putAll(newServerChannels);
+
getListeners().fireServiceActivated();
req.setDone();
} catch (Exception e) {
req.setException(e);
} finally {
- if (ch != null && req.getException() != null) {
- try {
- ch.disconnect();
- ch.close();
- } catch (Throwable e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
+ // Roll back if failed to bind all addresses.
+ if (req.getException() != null) {
+ for (DatagramChannel c: newServerChannels.values()) {
+ c.keyFor(selector).cancel();
+ try {
+ c.disconnect();
+ c.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
}
}
}
@@ -511,22 +552,22 @@
break;
}
- DatagramChannel ch = this.channel;
- this.channel = null;
+ // close the channels
+ for (DatagramChannel c: serverChannels.values()) {
+ try {
+ SelectionKey key = c.keyFor(selector);
+ key.cancel();
- // close the channel
- try {
- SelectionKey key = ch.keyFor(selector);
- key.cancel();
- selector.wakeup(); // wake up again to trigger thread death
- ch.disconnect();
- ch.close();
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- } finally {
- getListeners().fireServiceDeactivated();
- request.setDone();
+ selector.wakeup(); // wake up again to trigger thread death
+ c.disconnect();
+ c.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
}
+
+ serverChannels.clear();
+ request.setDone();
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=593496&r1=593495&r2=593496&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
Fri Nov 9 03:36:01 2007
@@ -28,7 +28,12 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,7 +80,8 @@
private final Queue<ServiceOperationFuture> cancelQueue =
new ConcurrentLinkedQueue<ServiceOperationFuture>();
- private ServerSocketChannel serverSocketChannel;
+ private final Map<SocketAddress, ServerSocketChannel> serverChannels =
+ Collections.synchronizedMap(new HashMap<SocketAddress,
ServerSocketChannel>());
private final Selector selector;
private Worker worker;
@@ -226,11 +232,14 @@
throw request.getException();
}
- // Update the local address.
- // setLocalAddress() shouldn't be called from the worker thread
+ // Update the local addresses.
+ // setLocalAddresses() shouldn't be called from the worker thread
// because of deadlock.
- setLocalAddress(serverSocketChannel.socket()
- .getLocalSocketAddress());
+ Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+ for (ServerSocketChannel c: serverChannels.values()) {
+ newLocalAddresses.add(c.socket().getLocalSocketAddress());
+ }
+ setLocalAddresses(newLocalAddresses);
}
/**
@@ -382,40 +391,62 @@
* Registers OP_ACCEPT for selector
*/
private void registerNew() {
- for (; ;) {
+ for (;;) {
ServiceOperationFuture future = registerQueue.poll();
if (future == null) {
break;
}
- ServerSocketChannel ssc = null;
-
+ Map<SocketAddress, ServerSocketChannel> newServerChannels =
+ new HashMap<SocketAddress, ServerSocketChannel>();
+ List<SocketAddress> localAddresses = getLocalAddresses();
+
try {
- ssc = ServerSocketChannel.open();
- ssc.configureBlocking(false);
-
- // Configure the server socket,
- ssc.socket().setReuseAddress(isReuseAddress());
- ssc.socket().setReceiveBufferSize(
- getSessionConfig().getReceiveBufferSize());
-
- // and bind.
- ssc.socket().bind(getLocalAddress(), getBacklog());
- ssc.register(selector, SelectionKey.OP_ACCEPT, future);
-
- serverSocketChannel = ssc;
+ for (SocketAddress a: localAddresses) {
+ ServerSocketChannel c = null;
+ boolean success = false;
+ try {
+ c = ServerSocketChannel.open();
+ c.configureBlocking(false);
+ // Configure the server socket,
+ c.socket().setReuseAddress(isReuseAddress());
+ c.socket().setReceiveBufferSize(
+ getSessionConfig().getReceiveBufferSize());
+ // and bind.
+ c.socket().bind(a, getBacklog());
+ c.register(selector, SelectionKey.OP_ACCEPT, future);
+ success = true;
+ } finally {
+ if (!success && c != null) {
+ try {
+ c.close();
+ } catch (IOException e) {
+
ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ newServerChannels.put(c.socket().getLocalSocketAddress(),
c);
+ }
+
+ serverChannels.putAll(newServerChannels);
// and notify.
future.setDone();
} catch (Exception e) {
future.setException(e);
} finally {
- if (ssc != null && future.getException() != null) {
- try {
- ssc.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
+ // Roll back if failed to bind all addresses.
+ if (future.getException() != null) {
+ for (ServerSocketChannel c: newServerChannels.values()) {
+ c.keyFor(selector).cancel();
+ try {
+ c.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
}
+
}
}
}
@@ -434,27 +465,25 @@
break;
}
- // close the channel
- try {
- SelectionKey key = serverSocketChannel.keyFor(selector);
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
+ // close the channels
+ for (ServerSocketChannel c: serverChannels.values()) {
+ try {
+ SelectionKey key = c.keyFor(selector);
+ key.cancel();
- serverSocketChannel.close();
- serverSocketChannel = null;
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- } finally {
- future.setDone();
+ selector.wakeup(); // wake up again to trigger thread death
+ c.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
}
+
+ serverChannels.clear();
+ future.setDone();
}
}
- /**
- * @see
org.apache.mina.common.IoAcceptor#newSession(java.net.SocketAddress)
- */
- public IoSession newSession(SocketAddress remoteAddress) {
+ public IoSession newSession(SocketAddress remoteAddress, SocketAddress
localAddress) {
throw new UnsupportedOperationException();
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?rev=593496&r1=593495&r2=593496&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
Fri Nov 9 03:36:01 2007
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.mina.common.AbstractIoAcceptor;
@@ -71,33 +73,50 @@
@Override
protected void doBind() throws IOException {
- VmPipeAddress localAddress = getLocalAddress();
+ List<SocketAddress> localAddresses = getLocalAddresses();
+ List<SocketAddress> newLocalAddresses = new ArrayList<SocketAddress>();
synchronized (boundHandlers) {
- if (localAddress == null || localAddress.getPort() == 0) {
- localAddress = null;
- for (int i = 1; i < Integer.MAX_VALUE; i++) {
- VmPipeAddress newLocalAddress = new VmPipeAddress(i);
- if (!boundHandlers.containsKey(newLocalAddress)) {
- localAddress = newLocalAddress;
- break;
+ for (SocketAddress a: localAddresses) {
+ VmPipeAddress localAddress = (VmPipeAddress) a;
+ if (localAddress.getPort() == 0) {
+ localAddress = null;
+ for (int i = 1; i < Integer.MAX_VALUE; i++) {
+ VmPipeAddress newLocalAddress = new VmPipeAddress(i);
+ if (!boundHandlers.containsKey(newLocalAddress) &&
+ !newLocalAddresses.contains(newLocalAddress)) {
+ localAddress = newLocalAddress;
+ break;
+ }
}
+
+ if (localAddress == null) {
+ throw new IOException("No port available.");
+ }
+ } else if (localAddress.getPort() < 0) {
+ throw new IOException("Bind port number must be 0 or
above.");
+ } else if (boundHandlers.containsKey(localAddress)) {
+ throw new IOException("Address already bound: " +
localAddress);
}
+
+ newLocalAddresses.add(localAddress);
+ }
- if (localAddress == null) {
- throw new IOException("No port available.");
+ for (SocketAddress a: newLocalAddresses) {
+ VmPipeAddress localAddress = (VmPipeAddress) a;
+ if (!boundHandlers.containsKey(localAddress)) {
+ boundHandlers.put(localAddress, new VmPipe(this,
localAddress,
+ getHandler(), getListeners()));
+ } else {
+ for (SocketAddress a2: newLocalAddresses) {
+ boundHandlers.remove(a2);
+ }
+ break;
}
- } else if (localAddress.getPort() < 0) {
- throw new IOException("Bind port number must be 0 or above.");
- } else if (boundHandlers.containsKey(localAddress)) {
- throw new IOException("Address already bound: " +
localAddress);
}
-
- boundHandlers.put(localAddress, new VmPipe(this, localAddress,
- getHandler(), getListeners()));
}
- setLocalAddress(localAddress);
+ setLocalAddresses(newLocalAddresses);
}
@Override
@@ -109,7 +128,7 @@
getListeners().fireServiceDeactivated();
}
- public IoSession newSession(SocketAddress remoteAddress) {
+ public IoSession newSession(SocketAddress remoteAddress, SocketAddress
localAddress) {
throw new UnsupportedOperationException();
}