Hi,
I made a lot of changes to the current datagram implementation in
trunk. DatagramAcceptor now collaborates with DatagramConnector to
create connected datagram socket which listens to the remote address
of the received packet. The following is simplified description of
flow:
* DatagramAcceptor's underlying DatagramChannel.receive is invoked
* Once data is received, new connected datagram session is created
using DatagramConnector.connect(receivedRemoteAddress,
acceptor.getLocalAddress()). If there's already a session instance
associated with the remote address, it's reused.
The advantages of this implementation are:
* The acceptor-side datagram sessions can perform much better in
multi-core environment.
* There's no need for IoSessionRecycler on the acceptor-side anymore.
You have full control over disconnection; probably using sessionIdle
event in most cases.
* Code duplication can be reduced once again, and most part of the
existing acceptor code can be extracted into a generic acceptor
implementation.
The disadvantage is that it forces you to set reuseAddress to true,
because multiple datagram channels are bound to the same local
address.
In Linux, connected datagram channels have priority over unconnected
channels, so most traffic will go directly into DatagramConnector's
NIOProcessors. The only exception I've found is broadcast packets. It
is always received from the unconnected channel in DatagramAcceptor.
DatagramAcceptor immediately forwards the event to an appropriate
session that DatagramConnector is managing, or creates a new session
using connect().
I think this behavior won't have any big differences among different
platforms (e.g. Windows and SUNOS). As long as multiple datagram
channels can be bound to the same local address, this implementation
should work fine. To make this sure, please run the following test
code in your machine, and let me know what the result is:
-------- CODE BEGINS --------
package net.gleamynode.tmp;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
import org.apache.mina.transport.socket.nio.DatagramConnector;
public class Main {
private static final int SERVER_PORT = 1234;
private static final int CLIENT_PORT = 5678;
public static void main(String args[]) throws Exception {
DatagramAcceptor acceptor = new DatagramAcceptor();
acceptor.setLocalAddress(new InetSocketAddress(SERVER_PORT));
acceptor.setHandler(new IoHandlerAdapter() {
@Override
public void sessionOpened(IoSession session) {
System.out.println("Open: " + session);
}
@Override
public void sessionClosed(IoSession session) {
System.out.println("Closed: " + session);
}
@Override
public void messageReceived(IoSession session, Object o) {
System.out.println("Received: " + session + ", " + o);
}
});
acceptor.bind();
DatagramConnector connector = new DatagramConnector();
connector.getSessionConfig().setReuseAddress(true);
connector.setHandler(new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession session, Object o) {
System.out.println("ERR?");
}
});
// Print the local host.
System.out.println("Local host: " + InetAddress.getLocalHost());
// Try point-to-point.
ConnectFuture f1 = connector.connect(
new InetSocketAddress(
InetAddress.getLocalHost(), SERVER_PORT),
new InetSocketAddress(
InetAddress.getLocalHost(), CLIENT_PORT));
f1.await();
f1.getSession().write(ByteBuffer.allocate(1)).await();
f1.getSession().write(ByteBuffer.allocate(1)).await();
// Try broadcast.
ConnectFuture f2 = connector.connect(
new InetSocketAddress(
"255.255.255.255", SERVER_PORT),
new InetSocketAddress(
InetAddress.getLocalHost(), CLIENT_PORT));
f2.await();
f2.getSession().write(ByteBuffer.allocate(1)).await();
f2.getSession().write(ByteBuffer.allocate(1)).await();
acceptor.unbind();
System.exit(0);
}
}
-------- CODE ENDS --------
The expected result is:
-------- RESULT BEGINS --------
Local host: primary/127.0.0.1
Open: (datagram, server, /127.0.0.1:5678 => /0:0:0:0:0:0:0:0:1234)
Received: (datagram, server, /127.0.0.1:5678 =>
/0:0:0:0:0:0:0:0:1234), HeapBuffer[pos=0 lim=1 cap=2048: 00]
Received: (datagram, server, /127.0.0.1:5678 =>
/0:0:0:0:0:0:0:0:1234), HeapBuffer[pos=0 lim=1 cap=2048: 00]
Received: (datagram, server, /127.0.0.1:5678 =>
/0:0:0:0:0:0:0:0:1234), HeapBuffer[pos=0 lim=1 cap=2048: 00]
Closed: (datagram, server, /127.0.0.1:5678 => /0:0:0:0:0:0:0:0:1234)
-------- RESULT ENDS --------
If this test fails on any platform that runs Java 5, I am doomed to
revert my changes. :)
Trustin
--
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6