Author: dblevins
Date: Tue Apr 6 15:27:00 2010
New Revision: 931184
URL: http://svn.apache.org/viewvc?rev=931184&view=rev
Log:
Looking good. Added the real IO code. All nodes find each other directly or
indirectly. Heartbeat happens at a configurable interval. All with one thread.
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java?rev=931184&r1=931183&r2=931184&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
(original)
+++
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
Tue Apr 6 15:27:00 2010
@@ -16,27 +16,28 @@
*/
package org.apache.openejb.server.discovery;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
+import static java.nio.channels.SelectionKey.OP_CONNECT;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
@@ -45,19 +46,13 @@ import java.util.concurrent.CountDownLat
public class EchoNet {
public static void main(String[] args) throws Exception {
- listen(3333);
-
Server a = new Server(4444).start();
Server b = new Server(5555).start();
-// Server c = new Server(6666).start();
- a.connect(3333);
-// b.connect(3333);
+ Server c = new Server(6666).start();
a.connect(b);
-// b.connect(a);
-// b.connect(c);
-// c.connect(a);
-// c.connect(b);
+ c.connect(b);
+ // A and C should hookup through B
new CountDownLatch(1).await();
}
@@ -78,58 +73,51 @@ public class EchoNet {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
final Socket socket = serverSocket.accept();
-// new Thread(new Runnable() {
-// public void run() {
-// try {
-// read(port, socket);
-// } catch (IOException e) {
-// e.printStackTrace();
-// }
-// }
-// }).start();
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ read(port, socket);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
}
}
private static void read(int port, Socket socket) throws IOException {
-// System.out.println(port + " - accept = " +
socket.getRemoteSocketAddress());
+ System.out.println(port + " - accept = " +
socket.getRemoteSocketAddress());
InputStream in = socket.getInputStream();
int i = -1;
byte[] buffer = new byte[1024];
while ((i = in.read(buffer)) != -1) {
- buffer[i++] = (byte)'\n';
-// System.out.write(buffer, 0, i);
+ buffer[i++] = (byte) '\n';
+ System.out.write(buffer, 0, i);
}
}
public static class Server {
- private int port;
- private Selector selector;
-
+ private final int port;
+ private final Selector selector;
private final URI me;
- public Server(int port) {
+ public Server(int port) throws IOException {
this.port = port;
me = URI.create("conn://localhost:" + port);
- ServerSocketChannel serverChannel;
- try {
- serverChannel = ServerSocketChannel.open();
- ServerSocket ss = serverChannel.socket();
- InetSocketAddress address = new InetSocketAddress(port);
- ss.bind(address);
- serverChannel.configureBlocking(false);
+ ServerSocketChannel serverChannel = ServerSocketChannel.open();
- selector = Selector.open();
+ ServerSocket serverSocket = serverChannel.socket();
+ InetSocketAddress address = new InetSocketAddress(port);
+ serverSocket.bind(address);
+ serverChannel.configureBlocking(false);
- serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+ selector = Selector.open();
- println("Listening");
+ serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- } catch (IOException ex) {
- ex.printStackTrace();
- return;
- }
+ println("Listening");
}
public Server start() {
@@ -145,42 +133,30 @@ public class EchoNet {
return this;
}
-
- private void printOps(SelectionKey key) {
- StringBuilder sb = new StringBuilder(key.hashCode() + " opts: ");
- if (key.isAcceptable()) sb.append("a");
- if (key.isConnectable()) sb.append("c");
- if (key.isReadable()) sb.append("r");
- if (key.isWritable()) sb.append("w");
- println(sb.toString());
- }
-
public class Session {
- private final LinkedList<ByteBuffer> buffers = new
LinkedList<ByteBuffer>();
- private final ByteBuffer read = ByteBuffer.allocate(1024);
- private final ByteBuffer write = ByteBuffer.allocate(1024);
-
- public SelectionKey key;
+ private static final int EOF = 3;
- public InetSocketAddress address;
+ private final SocketChannel channel;
+ private final ByteBuffer read = ByteBuffer.allocate(1024);
+ private final SelectionKey key;
+ private final InetSocketAddress address;
+ private final List<URI> listed = new ArrayList<URI>();
+ private ByteBuffer write;
private State state = State.GREETING;
- private final boolean server;
- private Iterator<URI> listing;
- private List<URI> listed = new ArrayList<URI>();
- public URI uri;
+ private URI uri;
- public Session(boolean server) {
- this.server = server;
+ public Session(SocketChannel channel, InetSocketAddress address,
URI uri) throws ClosedChannelException {
+ this.channel = channel;
+ this.address = address;
+ this.uri = uri != null ? uri : URI.create("conn://" +
address.getHostName() + ":" + address.getPort());
+ this.key = channel.register(selector, 0, this);
}
- public ByteBuffer pop() {
- try {
- return buffers.removeFirst();
- } finally {
- if (buffers.size() == 0) key.interestOps(OP_READ);
- }
+ public Session ops(int ops) {
+ key.interestOps(ops);
+ return this;
}
private void println(String s) {
@@ -192,7 +168,12 @@ public class EchoNet {
this.state = state;
key.interestOps(ops);
- trace("*");
+// trace("*");
+ }
+
+ public void setURI(URI uri) {
+ seen(uri);
+ this.uri = uri;
}
private void trace(String str) {
@@ -201,8 +182,6 @@ public class EchoNet {
sb.append(" ");
if ((key.interestOps() & OP_READ) == OP_READ) sb.append("<");
if ((key.interestOps() & OP_WRITE) == OP_WRITE) sb.append(">");
-// if ((ops & OP_READ) == OP_READ) sb.append("(r)");
-// if ((ops & OP_WRITE) == OP_WRITE) sb.append("(w)");
sb.append(" ");
sb.append(uri.getPort());
sb.append(" ");
@@ -212,6 +191,77 @@ public class EchoNet {
System.out.println(sb.toString());
}
+ public void write(URI uri) throws IOException {
+ write(Arrays.asList(uri));
+ }
+
+ public void write(List<URI> uris) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ for (URI uri : uris) {
+ byte[] b = uri.toString().getBytes("UTF-8");
+ baos.write(b);
+ baos.write(EOF);
+ }
+
+ this.write = ByteBuffer.wrap(baos.toByteArray());
+ }
+
+ public boolean drain() throws IOException {
+ this.channel.write(write);
+ return write.remaining() == 0;
+ }
+
+ public String read() throws IOException {
+
+ if (channel.read(read) == -1) throw new EOFException();
+
+ byte[] buf = read.array();
+
+ int end = endOfText(buf, 0, read.position());
+
+ if (end < 0) return null;
+
+ // Copy the string without the terminator char
+ String text = new String(buf, 0, end, "UTF-8");
+
+ int newPos = read.position() - end;
+ System.arraycopy(buf, end + 1, buf, 0, newPos);
+ read.position(newPos - 1);
+
+ return text;
+ }
+
+ private int endOfText(byte[] data, int offset, int pos) {
+ for (int i = offset; i < pos; i++) if (data[i] == EOF) return
i;
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return "Session{" +
+ "uri=" + uri +
+ ", state=" + state +
+ '}';
+ }
+
+ private final long rate = 3000;
+
+ private long last = 0;
+
+ public void tick() throws IOException {
+ if (state != State.HEARTBEAT) return;
+
+ long now = System.currentTimeMillis();
+ long delay = now - last;
+
+ if (delay > rate) {
+ last = now;
+ write(me);
+ state(OP_READ | OP_WRITE, State.HEARTBEAT);
+ }
+
+ }
}
private static enum State {
@@ -223,15 +273,13 @@ public class EchoNet {
private void _run() {
while (true) {
+// try {
+// Thread.sleep(100);
+// } catch (InterruptedException e) {
+// Thread.interrupted();
+// }
try {
-// if (me.getPort() != 5555) Thread.sleep(1000);
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.interrupted();
- }
- try {
- selector.select();
-// selector.select(2000);
+ selector.select(1000);
} catch (IOException ex) {
ex.printStackTrace();
break;
@@ -239,16 +287,11 @@ public class EchoNet {
Set keys = selector.selectedKeys();
-// println("selection " + keys.size() + " " + keys);
-
Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
-// printOps(key);
-
-
try {
if (key.isAcceptable()) {
@@ -270,15 +313,16 @@ public class EchoNet {
println("accept " + address.getPort());
client.configureBlocking(false);
- register(client, address, OP_READ, true);
+ Session session = new Session(client, address,
null);
+ session.state(OP_READ, State.GREETING);
}
if (key.isConnectable()) {
// we are a client
- SocketChannel channel = (SocketChannel)
key.channel();
- channel.finishConnect();
+ Session session = (Session) key.attachment();
+ session.channel.finishConnect();
// when you are a client, first say high to
everyone
// before accepting data
@@ -292,186 +336,158 @@ public class EchoNet {
// Afterward the server will only pulls its
heartbeat
- key.interestOps(OP_WRITE);
+ session.write(me);
+ session.state(OP_WRITE, State.GREETING);
}
if (key.isReadable()) {
- SocketChannel client = (SocketChannel)
key.channel();
- Session session = (Session) key.attachment();
-
- ByteBuffer output = session.read;
- output.clear();
-
- int i = client.read(output);
+ Session session = (Session) key.attachment();
- String message = new String(output.array(),
output.arrayOffset(), output.position());
+ switch (session.state) {
+ case GREETING: { // read
- if (message.length() == 0) {
- session.println(" --- ");
- return;
- } else {
-// session.println(message);
- }
+ String message = session.read();
- URI uri = URI.create(message);
+ if (message == null) break; // need to
read more
- switch (session.state) {
- case GREETING: { // read
- session.uri = uri;
+ session.setURI(URI.create(message));
session.println("welcome");
- session.state(OP_WRITE, State.LISTING);
-
ArrayList<URI> list = new
ArrayList<URI>(seen);
// When they read themselves on the list
// they'll know it's time to list their
URIs
list.remove(me); // yank
- list.remove(uri); // yank
- list.add(uri); // add to the end
+ list.remove(session.uri); // yank
+ list.add(session.uri); // add to the end
- session.listing = list.iterator();
+ session.write(list);
- }; break;
+ session.state(OP_WRITE, State.LISTING);
- case LISTING: { // read
- session.listed.add(uri);
+ }
+ ;
+ break;
- session.println(message);
+ case LISTING: { // read
- // they listed me, means they want my list
- if (uri.equals(me)) {
+ String message = null;
- // switch to write
- session.state(OP_WRITE, State.LISTING);
+ while ((message = session.read()) != null)
{
- ArrayList<URI> list = new
ArrayList<URI>(seen);
+ URI uri = URI.create(message);
- for (URI reported : session.listed) {
- list.remove(reported);
- }
-
- // When they read us on the list
- // they'll know it's time to switch to
heartbeat
-
- list.remove(session.uri);
- list.remove(me); // yank if in the
middle
- list.add(me); // add to the end
-
- session.listing = list.iterator();
- } else if (uri.equals(session.uri)) {
-
- session.state(OP_WRITE | OP_READ,
State.HEARTBEAT);
-
- } else if (!seen.contains(uri)) {
- try {
- connect(uri);
- } catch (Exception e) {
- println("connect failed " + uri +
" - " + e.getMessage());
- e.printStackTrace();
- }
- } else {
- session.println("ambiguous");
+ session.listed.add(uri);
- session.state(OP_WRITE | OP_READ,
State.HEARTBEAT);
- }
- }; break;
+ session.println(message);
- case HEARTBEAT: { // read
-
- session.println("pong");
+ // they listed me, means they want my
list
+ if (uri.equals(me)) {
+ ArrayList<URI> list = new
ArrayList<URI>(seen);
- }; break;
- }
+ for (URI reported :
session.listed) {
+ list.remove(reported);
+ }
- }
+ // When they read us on the list
+ // they'll know it's time to
switch to heartbeat
- if (key.isWritable()) {
-
- SocketChannel client = (SocketChannel)
key.channel();
- Session session = (Session) key.attachment();
+ list.remove(session.uri);
+ list.remove(me); // yank if in the
middle
+ list.add(me); // add to the end
- switch (session.state) {
- case GREETING: { // write
+ session.write(list);
- session.println("hello");
+ session.state(OP_WRITE,
State.LISTING);
- ByteBuffer output = session.write;
+ } else if (uri.equals(session.uri)) {
- output.clear();
+ session.state(OP_READ,
State.HEARTBEAT);
- output.put(me.toString().getBytes());
+ } else if (!seen.contains(uri)) {
+ try {
+ connect(uri);
+ } catch (Exception e) {
+ println("connect failed " +
uri + " - " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ }
- output.flip();
+ }
+ ;
+ break;
- client.write(output);
+ case HEARTBEAT: { // read
- session.state(OP_READ, State.LISTING);
+ String message = session.read();
- }; break;
+ if (message != null) {
+ session.println("pong");
+ }
- case LISTING: { // write
+ }
+ ;
+ break;
+ }
- URI uri = session.listing.next();
+ }
- ByteBuffer output = session.write;
+ if (key.isWritable()) {
- output.clear();
+ Session session = (Session) key.attachment();
- session.println(uri.toString());
+ switch (session.state) {
+ case GREETING: { // write
- output.put(uri.toString().getBytes());
+ if (session.drain()) {
+ session.state(OP_READ, State.LISTING);
+ }
- output.flip();
+ }
+ ;
+ break;
- client.write(output);
+ case LISTING: { // write
- if (!session.listing.hasNext()) {
+ if (session.drain()) {
- // We've just signaled them to
- // go next and list their URIs
- if (uri.equals(session.uri)) {
+ // we haven't ready any URIs yet
+ if (session.listed.size() == 0) {
session.state(OP_READ,
State.LISTING);
- } else if (uri.equals(me)) {
- // we've clearly signaled them that
- // we are done and do not expect
- // to read any URLs
-
- session.state(OP_WRITE |OP_READ,
State.HEARTBEAT);
-
} else {
- session.println("ambiguous state,
switching to heartbeat");
-
- session.state(OP_WRITE |OP_READ,
State.HEARTBEAT);
+ session.state(OP_READ,
State.HEARTBEAT);
}
}
- }; break;
+ }
+ ;
+ break;
case HEARTBEAT: { // write
- ByteBuffer output = session.write;
-
- output.clear();
+ if (session.drain()) {
- output.put(me.toString().getBytes());
+ session.last =
System.currentTimeMillis();
- output.flip();
+ session.println("ping");
- client.write(output);
+ session.state(OP_READ,
State.HEARTBEAT);
- session.println("ping");
+ }
- }; break;
+ }
+ ;
+ break;
}
}
@@ -485,19 +501,23 @@ public class EchoNet {
}
- }
- }
+ for (SelectionKey key : selector.keys()) {
+ Server.Session session = (Session) key.attachment();
+
+ try {
+ if (session != null) session.tick();
+ } catch (IOException ex) {
+ key.cancel();
+ try {
+ key.channel().close();
+ } catch (IOException cex) {
+ }
+ }
+ }
- private void push(ByteBuffer output, Session session) {
- for (Session fellow : clients.values()) {
- // Don't write to the originator
- if (fellow == session) continue;
- fellow.buffers.addLast(output);
- fellow.key.interestOps(OP_READ | OP_WRITE);
}
}
-
public void connect(Server s) throws Exception {
connect(s.port);
}
@@ -508,7 +528,7 @@ public class EchoNet {
public void connect(URI uri) throws Exception {
if (me.equals(uri)) return;
-
+
int port = uri.getPort();
String host = uri.getHost();
@@ -522,41 +542,19 @@ public class EchoNet {
socketChannel.connect(address);
- Session session = register(socketChannel, address,
SelectionKey.OP_CONNECT, false);
- session.uri = uri;
+ Session session = new Session(socketChannel, address, uri);
+ session.ops(OP_CONNECT);
// seen - needs to get maintained as "connected"
// TODO remove from seen
- seen.add(uri);
} catch (IOException e) {
throw new RuntimeException(e);
}
-// println("++ " + port);
}
- private final Map<SocketAddress, Session> clients = new
ConcurrentHashMap<SocketAddress, Session>();
-// private final List<Session> clients = new ArrayList<Session>();
-
- private Session register(SocketChannel client, InetSocketAddress
address, int ops, boolean server) throws IOException {
-// println("Registering " + address);
- Session session = new Session(server);
- session.key = client.register(selector, ops, session);
- session.address = address;
- session.uri = URI.create("conn://" + address.getHostName() + ":" +
address.getPort());
-
-// clients.add(session);
-// Session duplicate = clients.put(address, session);
-// if (duplicate != null) {
-// println("duplicate = " + duplicate + " " + address);
-// duplicate.key.channel().close();
-// println("closed duplicate " + address);
-// }
-//
-// for (SocketAddress node : clients.keySet()) {
-// println("node " + node);
-// }
-
- return session;
+ private void seen(URI uri) {
+ println("seen " + uri);
+ seen.add(uri);
}
private void println(String s) {