Author: dblevins
Date: Tue Apr  6 15:27:00 2010
New Revision: 931184

URL: http://svn.apache.org/viewvc?rev=931184&view=rev
Log:
Looking good.  Added the real IO code.  All nodes find each other directly or 
indirectly.  Heartbeat happens at a configurable interval.  All with one thread.

Modified:
    
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java

Modified: 
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java?rev=931184&r1=931183&r2=931184&view=diff
==============================================================================
--- 
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
 (original)
+++ 
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
 Tue Apr  6 15:27:00 2010
@@ -16,27 +16,28 @@
  */
 package org.apache.openejb.server.discovery;
 
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
+import static java.nio.channels.SelectionKey.OP_CONNECT;
 import static java.nio.channels.SelectionKey.OP_READ;
 import static java.nio.channels.SelectionKey.OP_WRITE;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 /**
@@ -45,19 +46,13 @@ import java.util.concurrent.CountDownLat
 public class EchoNet {
 
     public static void main(String[] args) throws Exception {
-        listen(3333);
-
         Server a = new Server(4444).start();
         Server b = new Server(5555).start();
-//        Server c = new Server(6666).start();
-        a.connect(3333);
-//        b.connect(3333);
+        Server c = new Server(6666).start();
         a.connect(b);
-//        b.connect(a);
-//        b.connect(c);
-//        c.connect(a);
-//        c.connect(b);
+        c.connect(b);
 
+        // A and C should hookup through B
 
         new CountDownLatch(1).await();
     }
@@ -78,58 +73,51 @@ public class EchoNet {
         ServerSocket serverSocket = new ServerSocket(port);
         while (true) {
             final Socket socket = serverSocket.accept();
-//            new Thread(new Runnable() {
-//                public void run() {
-//                    try {
-//                        read(port, socket);
-//                    } catch (IOException e) {
-//                        e.printStackTrace();
-//                    }
-//                }
-//            }).start();
+            new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        read(port, socket);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }).start();
         }
     }
 
     private static void read(int port, Socket socket) throws IOException {
-//        System.out.println(port + " - accept = " + 
socket.getRemoteSocketAddress());
+        System.out.println(port + " - accept = " + 
socket.getRemoteSocketAddress());
         InputStream in = socket.getInputStream();
         int i = -1;
         byte[] buffer = new byte[1024];
         while ((i = in.read(buffer)) != -1) {
-            buffer[i++] = (byte)'\n';
-//            System.out.write(buffer, 0, i);
+            buffer[i++] = (byte) '\n';
+            System.out.write(buffer, 0, i);
         }
     }
 
     public static class Server {
-        private int port;
-        private Selector selector;
-
+        private final int port;
+        private final Selector selector;
         private final URI me;
 
-        public Server(int port) {
+        public Server(int port) throws IOException {
             this.port = port;
 
             me = URI.create("conn://localhost:" + port);
 
-            ServerSocketChannel serverChannel;
-            try {
-                serverChannel = ServerSocketChannel.open();
-                ServerSocket ss = serverChannel.socket();
-                InetSocketAddress address = new InetSocketAddress(port);
-                ss.bind(address);
-                serverChannel.configureBlocking(false);
+            ServerSocketChannel serverChannel = ServerSocketChannel.open();
 
-                selector = Selector.open();
+            ServerSocket serverSocket = serverChannel.socket();
+            InetSocketAddress address = new InetSocketAddress(port);
+            serverSocket.bind(address);
+            serverChannel.configureBlocking(false);
 
-                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+            selector = Selector.open();
 
-                println("Listening");
+            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
 
-            } catch (IOException ex) {
-                ex.printStackTrace();
-                return;
-            }
+            println("Listening");
         }
 
         public Server start() {
@@ -145,42 +133,30 @@ public class EchoNet {
             return this;
         }
 
-
-        private void printOps(SelectionKey key) {
-            StringBuilder sb = new StringBuilder(key.hashCode() + "  opts: ");
-            if (key.isAcceptable()) sb.append("a");
-            if (key.isConnectable()) sb.append("c");
-            if (key.isReadable()) sb.append("r");
-            if (key.isWritable()) sb.append("w");
-            println(sb.toString());
-        }
-
         public class Session {
-            private final LinkedList<ByteBuffer> buffers = new 
LinkedList<ByteBuffer>();
 
-            private final ByteBuffer read = ByteBuffer.allocate(1024);
-            private final ByteBuffer write = ByteBuffer.allocate(1024);
-
-            public SelectionKey key;
+            private static final int EOF = 3;
 
-            public InetSocketAddress address;
+            private final SocketChannel channel;
+            private final ByteBuffer read = ByteBuffer.allocate(1024);
+            private final SelectionKey key;
+            private final InetSocketAddress address;
+            private final List<URI> listed = new ArrayList<URI>();
 
+            private ByteBuffer write;
             private State state = State.GREETING;
-            private final boolean server;
-            private Iterator<URI> listing;
-            private List<URI> listed = new ArrayList<URI>();
-            public URI uri;
+            private URI uri;
 
-            public Session(boolean server) {
-                this.server = server;
+            public Session(SocketChannel channel, InetSocketAddress address, 
URI uri) throws ClosedChannelException {
+                this.channel = channel;
+                this.address = address;
+                this.uri = uri != null ? uri : URI.create("conn://" + 
address.getHostName() + ":" + address.getPort());
+                this.key = channel.register(selector, 0, this);
             }
 
-            public ByteBuffer pop() {
-                try {
-                    return buffers.removeFirst();
-                } finally {
-                    if (buffers.size() == 0) key.interestOps(OP_READ);
-                }
+            public Session ops(int ops) {
+                key.interestOps(ops);
+                return this;
             }
 
             private void println(String s) {
@@ -192,7 +168,12 @@ public class EchoNet {
                 this.state = state;
                 key.interestOps(ops);
 
-                trace("*");
+//                trace("*");
+            }
+
+            public void setURI(URI uri) {
+                seen(uri);
+                this.uri = uri;
             }
 
             private void trace(String str) {
@@ -201,8 +182,6 @@ public class EchoNet {
                 sb.append(" ");
                 if ((key.interestOps() & OP_READ) == OP_READ) sb.append("<");
                 if ((key.interestOps() & OP_WRITE) == OP_WRITE) sb.append(">");
-//                if ((ops & OP_READ) == OP_READ) sb.append("(r)");
-//                if ((ops & OP_WRITE) == OP_WRITE) sb.append("(w)");
                 sb.append(" ");
                 sb.append(uri.getPort());
                 sb.append(" ");
@@ -212,6 +191,77 @@ public class EchoNet {
                 System.out.println(sb.toString());
             }
 
+            public void write(URI uri) throws IOException {
+                write(Arrays.asList(uri));
+            }
+
+            public void write(List<URI> uris) throws IOException {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+                for (URI uri : uris) {
+                    byte[] b = uri.toString().getBytes("UTF-8");
+                    baos.write(b);
+                    baos.write(EOF);
+                }
+
+                this.write = ByteBuffer.wrap(baos.toByteArray());
+            }
+
+            public boolean drain() throws IOException {
+                this.channel.write(write);
+                return write.remaining() == 0;
+            }
+
+            public String read() throws IOException {
+
+                if (channel.read(read) == -1) throw new EOFException();
+
+                byte[] buf = read.array();
+
+                int end = endOfText(buf, 0, read.position());
+
+                if (end < 0) return null;
+
+                // Copy the string without the terminator char
+                String text = new String(buf, 0, end, "UTF-8");
+
+                int newPos = read.position() - end;
+                System.arraycopy(buf, end + 1, buf, 0, newPos);
+                read.position(newPos - 1);
+
+                return text;
+            }
+
+            private int endOfText(byte[] data, int offset, int pos) {
+                for (int i = offset; i < pos; i++) if (data[i] == EOF) return 
i;
+                return -1;
+            }
+
+            @Override
+            public String toString() {
+                return "Session{" +
+                        "uri=" + uri +
+                        ", state=" + state +
+                        '}';
+            }
+
+            private final long rate = 3000;
+
+            private long last = 0;
+
+            public void tick() throws IOException {
+                if (state != State.HEARTBEAT) return;
+
+                long now = System.currentTimeMillis();
+                long delay = now - last;
+
+                if (delay > rate) {
+                    last = now;
+                    write(me);
+                    state(OP_READ | OP_WRITE, State.HEARTBEAT);
+                }
+
+            }
         }
 
         private static enum State {
@@ -223,15 +273,13 @@ public class EchoNet {
         private void _run() {
             while (true) {
 
+//                try {
+//                    Thread.sleep(100);
+//                } catch (InterruptedException e) {
+//                    Thread.interrupted();
+//                }
                 try {
-//                    if (me.getPort() != 5555) Thread.sleep(1000);
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.interrupted();
-                }
-                try {
-                    selector.select();
-//                    selector.select(2000);
+                    selector.select(1000);
                 } catch (IOException ex) {
                     ex.printStackTrace();
                     break;
@@ -239,16 +287,11 @@ public class EchoNet {
 
                 Set keys = selector.selectedKeys();
 
-//                println("selection " + keys.size() + "  " + keys);
-
                 Iterator iterator = keys.iterator();
                 while (iterator.hasNext()) {
                     SelectionKey key = (SelectionKey) iterator.next();
                     iterator.remove();
 
-//                    printOps(key);
-
-
                     try {
                         if (key.isAcceptable()) {
 
@@ -270,15 +313,16 @@ public class EchoNet {
                             println("accept " + address.getPort());
                             client.configureBlocking(false);
 
-                            register(client, address, OP_READ, true);
+                            Session session = new Session(client, address, 
null);
+                            session.state(OP_READ, State.GREETING);
                         }
 
                         if (key.isConnectable()) {
 
                             // we are a client
 
-                            SocketChannel channel = (SocketChannel) 
key.channel();
-                            channel.finishConnect();
+                            Session session = (Session) key.attachment();
+                            session.channel.finishConnect();
 
                             // when you are a client, first say high to 
everyone
                             // before accepting data
@@ -292,186 +336,158 @@ public class EchoNet {
 
                             // Afterward the server will only pulls its 
heartbeat
 
-                            key.interestOps(OP_WRITE);
+                            session.write(me);
 
+                            session.state(OP_WRITE, State.GREETING);
                         }
 
                         if (key.isReadable()) {
 
-                            SocketChannel client = (SocketChannel) 
key.channel();
-                            Session session = (Session) key.attachment();
-
-                            ByteBuffer output = session.read;
 
-                            output.clear();
-
-                            int i = client.read(output);
+                            Session session = (Session) key.attachment();
 
-                            String message = new String(output.array(), 
output.arrayOffset(), output.position());
+                            switch (session.state) {
+                                case GREETING: { // read
 
-                            if (message.length() == 0) {
-                                session.println(" --- ");
-                                return;
-                            } else {
-//                                session.println(message);
-                            }
+                                    String message = session.read();
 
-                            URI uri = URI.create(message);
+                                    if (message == null) break; // need to 
read more
 
-                            switch (session.state) {
-                                case GREETING: { // read
-                                    session.uri = uri;
+                                    session.setURI(URI.create(message));
 
                                     session.println("welcome");
 
-                                    session.state(OP_WRITE, State.LISTING);
-
                                     ArrayList<URI> list = new 
ArrayList<URI>(seen);
 
                                     // When they read themselves on the list
                                     // they'll know it's time to list their 
URIs
 
                                     list.remove(me); // yank
-                                    list.remove(uri); // yank
-                                    list.add(uri); // add to the end
+                                    list.remove(session.uri); // yank
+                                    list.add(session.uri); // add to the end
 
-                                    session.listing = list.iterator();
+                                    session.write(list);
 
-                                }; break;
+                                    session.state(OP_WRITE, State.LISTING);
 
-                                case LISTING: { // read
 
-                                    session.listed.add(uri);
+                                }
+                                ;
+                                break;
 
-                                    session.println(message);
+                                case LISTING: { // read
 
-                                    // they listed me, means they want my list
-                                    if (uri.equals(me)) {
+                                    String message = null;
 
-                                        // switch to write
-                                        session.state(OP_WRITE, State.LISTING);
+                                    while ((message = session.read()) != null) 
{
 
-                                        ArrayList<URI> list = new 
ArrayList<URI>(seen);
+                                        URI uri = URI.create(message);
 
-                                        for (URI reported : session.listed) {
-                                            list.remove(reported);
-                                        }
-                                        
-                                        // When they read us on the list
-                                        // they'll know it's time to switch to 
heartbeat
-
-                                        list.remove(session.uri);
-                                        list.remove(me); // yank if in the 
middle
-                                        list.add(me); // add to the end
-
-                                        session.listing = list.iterator();
-                                    } else if (uri.equals(session.uri)) {
-
-                                        session.state(OP_WRITE | OP_READ, 
State.HEARTBEAT);
-
-                                    } else if (!seen.contains(uri)) {
-                                        try {
-                                            connect(uri);
-                                        } catch (Exception e) {
-                                            println("connect failed " + uri + 
" - " + e.getMessage());
-                                            e.printStackTrace();
-                                        }
-                                    } else {
-                                        session.println("ambiguous");
+                                        session.listed.add(uri);
 
-                                        session.state(OP_WRITE | OP_READ, 
State.HEARTBEAT);
-                                    }
-                                }; break;
+                                        session.println(message);
 
-                                case HEARTBEAT: { // read
-
-                                    session.println("pong");
+                                        // they listed me, means they want my 
list
+                                        if (uri.equals(me)) {
+                                            ArrayList<URI> list = new 
ArrayList<URI>(seen);
 
-                                }; break;
-                            }
+                                            for (URI reported : 
session.listed) {
+                                                list.remove(reported);
+                                            }
 
-                        }
+                                            // When they read us on the list
+                                            // they'll know it's time to 
switch to heartbeat
 
-                        if (key.isWritable()) {
-
-                            SocketChannel client = (SocketChannel) 
key.channel();
-                            Session session = (Session) key.attachment();
+                                            list.remove(session.uri);
+                                            list.remove(me); // yank if in the 
middle
+                                            list.add(me); // add to the end
 
-                            switch (session.state) {
-                                case GREETING: { // write
+                                            session.write(list);
 
-                                    session.println("hello");
+                                            session.state(OP_WRITE, 
State.LISTING);
 
-                                    ByteBuffer output = session.write;
+                                        } else if (uri.equals(session.uri)) {
 
-                                    output.clear();
+                                            session.state(OP_READ, 
State.HEARTBEAT);
 
-                                    output.put(me.toString().getBytes());
+                                        } else if (!seen.contains(uri)) {
+                                            try {
+                                                connect(uri);
+                                            } catch (Exception e) {
+                                                println("connect failed " + 
uri + " - " + e.getMessage());
+                                                e.printStackTrace();
+                                            }
+                                        }
+                                    }
 
-                                    output.flip();
+                                }
+                                ;
+                                break;
 
-                                    client.write(output);
+                                case HEARTBEAT: { // read
 
-                                    session.state(OP_READ, State.LISTING);
+                                    String message = session.read();
 
-                                }; break;
+                                    if (message != null) {
+                                        session.println("pong");
+                                    }
 
-                                case LISTING: { // write
+                                }
+                                ;
+                                break;
+                            }
 
-                                    URI uri = session.listing.next();
+                        }
 
-                                    ByteBuffer output = session.write;
+                        if (key.isWritable()) {
 
-                                    output.clear();
+                            Session session = (Session) key.attachment();
 
-                                    session.println(uri.toString());
+                            switch (session.state) {
+                                case GREETING: { // write
 
-                                    output.put(uri.toString().getBytes());
+                                    if (session.drain()) {
+                                        session.state(OP_READ, State.LISTING);
+                                    }
 
-                                    output.flip();
+                                }
+                                ;
+                                break;
 
-                                    client.write(output);
+                                case LISTING: { // write
 
-                                    if (!session.listing.hasNext()) {
+                                    if (session.drain()) {
 
-                                        // We've just signaled them to
-                                        // go next and list their URIs
-                                        if (uri.equals(session.uri)) {
+                                        // we haven't ready any URIs yet
+                                        if (session.listed.size() == 0) {
 
                                             session.state(OP_READ, 
State.LISTING);
 
-                                        } else if (uri.equals(me)) {
-                                            // we've clearly signaled them that
-                                            // we are done and do not expect
-                                            // to read any URLs
-
-                                            session.state(OP_WRITE |OP_READ, 
State.HEARTBEAT);
-
                                         } else {
 
-                                            session.println("ambiguous state, 
switching to heartbeat");
-
-                                            session.state(OP_WRITE |OP_READ, 
State.HEARTBEAT);
+                                            session.state(OP_READ, 
State.HEARTBEAT);
 
                                         }
                                     }
-                                }; break;
+                                }
+                                ;
+                                break;
 
                                 case HEARTBEAT: { // write
 
-                                    ByteBuffer output = session.write;
-
-                                    output.clear();
+                                    if (session.drain()) {
 
-                                    output.put(me.toString().getBytes());
+                                        session.last = 
System.currentTimeMillis();
 
-                                    output.flip();
+                                        session.println("ping");
 
-                                    client.write(output);
+                                        session.state(OP_READ, 
State.HEARTBEAT);
 
-                                    session.println("ping");
+                                    }
 
-                                }; break;
+                                }
+                                ;
+                                break;
                             }
                         }
 
@@ -485,19 +501,23 @@ public class EchoNet {
 
                 }
 
-            }
-        }
+                for (SelectionKey key : selector.keys()) {
+                    Server.Session session = (Session) key.attachment();
+
+                    try {
+                        if (session != null) session.tick();
+                    } catch (IOException ex) {
+                        key.cancel();
+                        try {
+                            key.channel().close();
+                        } catch (IOException cex) {
+                        }
+                    }
+                }
 
-        private void push(ByteBuffer output, Session session) {
-            for (Session fellow : clients.values()) {
-                // Don't write to the originator
-                if (fellow == session) continue;
-                fellow.buffers.addLast(output);
-                fellow.key.interestOps(OP_READ | OP_WRITE);
             }
         }
 
-
         public void connect(Server s) throws Exception {
             connect(s.port);
         }
@@ -508,7 +528,7 @@ public class EchoNet {
 
         public void connect(URI uri) throws Exception {
             if (me.equals(uri)) return;
-            
+
             int port = uri.getPort();
             String host = uri.getHost();
 
@@ -522,41 +542,19 @@ public class EchoNet {
 
                 socketChannel.connect(address);
 
-                Session session = register(socketChannel, address, 
SelectionKey.OP_CONNECT, false);
-                session.uri = uri;
+                Session session = new Session(socketChannel, address, uri);
+                session.ops(OP_CONNECT);
 
                 // seen - needs to get maintained as "connected"
                 // TODO remove from seen
-                seen.add(uri);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-//            println("++ " + port);
         }
 
-        private final Map<SocketAddress, Session> clients = new 
ConcurrentHashMap<SocketAddress, Session>();
-//        private final List<Session> clients = new ArrayList<Session>();
-
-        private Session register(SocketChannel client, InetSocketAddress 
address, int ops, boolean server) throws IOException {
-//            println("Registering " + address);
-            Session session = new Session(server);
-            session.key = client.register(selector, ops, session);
-            session.address = address;
-            session.uri = URI.create("conn://" + address.getHostName() + ":" + 
address.getPort());
-
-//            clients.add(session);
-//            Session duplicate = clients.put(address, session);
-//            if (duplicate != null) {
-//                println("duplicate = " + duplicate + "  " + address);
-//                duplicate.key.channel().close();
-//                println("closed duplicate " + address);
-//            }
-//
-//            for (SocketAddress node : clients.keySet()) {
-//                println("node " + node);
-//            }
-
-            return session;
+        private void seen(URI uri) {
+            println("seen " + uri);
+            seen.add(uri);
         }
 
         private void println(String s) {


Reply via email to