Author: dblevins Date: Wed Dec 21 22:48:19 2011 New Revision: 1221925 URL: http://svn.apache.org/viewvc?rev=1221925&view=rev Log: svn merge -r 1221923:1221924 https://svn.apache.org/repos/asf/openejb/trunk/openejb
http://svn.apache.org/viewvc?view=revision&revision=1221924 ------------------------------------------------------------------------ r1221924 | dblevins | 2011-12-21 14:46:48 -0800 (Wed, 21 Dec 2011) | 3 lines OPENEJB-1729: Reliability of Multipoint remove event when last peer disappears OPENEJB-1730: Reliability of multipoint discovery heartrates less than 1 second ------------------------------------------------------------------------ Modified: openejb/branches/openejb-3.1.x/ (props changed) openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java (props changed) openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml (props changed) openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java Propchange: openejb/branches/openejb-3.1.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Dec 21 22:48:19 2011 @@ -1,3 +1,3 @@ /openejb/branches/openejb-3.1.1:779593 -/openejb/trunk/openejb:1182222 +/openejb/trunk/openejb:1182222,1221924 /openejb/trunk/openejb3:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948243,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1004172,1029528 Propchange: openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Dec 21 22:48:19 2011 @@ -1,3 +1,3 @@ /openejb/branches/openejb-3.1.1/container/openejb-core/src/test/java/org/apache/openejb/config/UberInterfaceTest.java:779593 -/openejb/trunk/openejb/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:1182222 +/openejb/trunk/openejb/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:1182222,1221924 /openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1004172,1029528 Propchange: openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Dec 21 22:48:19 2011 @@ -1,3 +1,3 @@ /openejb/branches/openejb-3.1.1/examples/alternate-descriptors/src/main/resources/META-INF/ejb-jar.xml:779593 -/openejb/trunk/openejb/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:1182222 +/openejb/trunk/openejb/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:1182222,1221924 /openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948243,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1029528 Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1221925&r1=1221924&r2=1221925&view=diff ============================================================================== --- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original) +++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Wed Dec 21 22:48:19 2011 @@ -53,6 +53,16 @@ public class MultipointDiscoveryAgent im private Tracker tracker; private MultipointServer multipointServer; + private boolean debug; + private String name; + + public MultipointDiscoveryAgent() { + } + + public MultipointDiscoveryAgent(boolean debug, String name) { + this.debug = debug; + this.name = name; + } public void init(Properties props) { @@ -73,6 +83,7 @@ public class MultipointDiscoveryAgent im builder.setReconnectDelay(options.get("reconnect_delay", builder.getReconnectDelay())); builder.setExponentialBackoff(options.get("exponential_backoff", builder.getExponentialBackoff())); builder.setMaxReconnectAttempts(options.get("max_reconnect_attempts", builder.getMaxReconnectAttempts())); + builder.setDebug(debug); tracker = builder.build(); } @@ -121,7 +132,7 @@ public class MultipointDiscoveryAgent im try { if (running.compareAndSet(false, true)) { - multipointServer = new MultipointServer(host, port, tracker).start(); + multipointServer = new MultipointServer(host, port, tracker, name, debug).start(); this.port = multipointServer.getPort(); Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1221925&r1=1221924&r2=1221925&view=diff ============================================================================== --- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original) +++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Wed Dec 21 22:48:19 2011 @@ -16,6 +16,7 @@ */ package org.apache.openejb.server.discovery; +import org.apache.openejb.util.Join; import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; @@ -42,7 +43,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -52,30 +55,46 @@ public class MultipointServer { private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class); private static final URI END_LIST = URI.create("end:list"); - + private final String host; private final int port; private final Selector selector; private final URI me; + /** + * Only used for toString to make debugging easier + */ + private final String name; + + private final Tracker tracker; private final LinkedList<URI> connect = new LinkedList<URI>(); private final Map<URI, Session> connections = new HashMap<URI, Session>(); + private boolean debug = false; public MultipointServer(int port, Tracker tracker) throws IOException { this("localhost", port, tracker); } public MultipointServer(String host, int port, Tracker tracker) throws IOException { + this(host, port, tracker, randomColor()); + } + + public MultipointServer(String host, int port, Tracker tracker, String name) throws IOException { + this(host, port, tracker, name, false); + } + + public MultipointServer(String host, int port, Tracker tracker, String name, boolean debug) throws IOException { if (tracker == null) throw new NullPointerException("tracker cannot be null"); this.host = host; this.tracker = tracker; - + this.name = name; + this.debug = debug; ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverChannel.socket(); - InetSocketAddress address = new InetSocketAddress(host,port); + InetSocketAddress address = new InetSocketAddress(host, port); serverSocket.bind(address); serverChannel.configureBlocking(false); @@ -100,7 +119,7 @@ public class MultipointServer { _run(); } }); - thread.setName("Server." + port); + thread.setName(Join.join(".", "MultipointServer", name, port)); thread.start(); } return this; @@ -150,7 +169,7 @@ public class MultipointServer { private void trace(String str) { // println(message(str)); - if (log.isDebugEnabled()) { + if (debug && log.isDebugEnabled()) { log.debug(message(str)); } } @@ -164,7 +183,9 @@ public class MultipointServer { } private String message(String str) { - StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(); + sb.append(name); + sb.append(":"); sb.append(port); sb.append(" "); if (key.isValid()) { @@ -189,11 +210,11 @@ public class MultipointServer { } public void write(Collection<?> uris) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); for (Object uri : uris) { - String s = uri.toString(); - byte[] b = s.getBytes("UTF-8"); + final String s = uri.toString(); + final byte[] b = s.getBytes("UTF-8"); baos.write(b); baos.write(EOF); } @@ -210,16 +231,16 @@ public class MultipointServer { if (channel.read(read) == -1) throw new EOFException(); - byte[] buf = read.array(); + final byte[] buf = read.array(); - int end = endOfText(buf, 0, read.position()); + final int end = endOfText(buf, 0, read.position()); if (end < 0) return null; // Copy the string without the terminator char - String text = new String(buf, 0, end, "UTF-8"); + final String text = new String(buf, 0, end, "UTF-8"); - int newPos = read.position() - end; + final int newPos = read.position() - end; System.arraycopy(buf, end + 1, buf, 0, newPos - 1); read.position(newPos - 1); @@ -248,10 +269,10 @@ public class MultipointServer { public void tick() throws IOException { if (state != State.HEARTBEAT) return; - long now = System.currentTimeMillis(); - long delay = now - last; + final long now = System.currentTimeMillis(); + final long delay = now - last; - if (delay > tracker.getHeartRate()) { + if (delay >= tracker.getHeartRate()) { last = now; heartbeat(); } @@ -266,8 +287,6 @@ public class MultipointServer { } write(strings); state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT); - - tracker.checkServices(); } } @@ -278,342 +297,379 @@ public class MultipointServer { private final AtomicBoolean running = new AtomicBoolean(); private void _run() { + + // The selectorTimeout ensures that even when there are no IO events, + // this loop will "wake up" and execute at least as frequently as the + // expected heartrate. + // + // We initiate WRITE events (the heartbeats we send) in this loop, so that + // detail is critical. + long selectorTimeout = tracker.getHeartRate(); + + // For reliability purposes we will actually adjust the selectorTimeout + // on each iteration of the loop, shrinking it down just a little to a + // account for the execution time of the loop itself. + while (running.get()) { + + final long start = System.nanoTime(); + try { - selector.select(1000); + selector.select(selectorTimeout); } catch (IOException ex) { ex.printStackTrace(); break; } - Set keys = selector.selectedKeys(); - - Iterator iterator = keys.iterator(); + final Set keys = selector.selectedKeys(); + final Iterator iterator = keys.iterator(); while (iterator.hasNext()) { - SelectionKey key = (SelectionKey) iterator.next(); + final SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); try { - if (key.isAcceptable()) { - - // we are a server + if (key.isAcceptable()) doAccept(key); - // when you are a server, we must first listen for the - // address of the client before sending data. + if (key.isConnectable()) doConnect(key); - // once they send us their address, we will send our - // full list of known addresses, followed by the "end" - // address to signal that we are done. + if (key.isReadable()) doRead(key); - // Afterward we will only pulls our heartbeat + if (key.isWritable()) doWrite(key); - ServerSocketChannel server = (ServerSocketChannel) key.channel(); - SocketChannel client = server.accept(); - InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress(); + } catch (CancelledKeyException ex) { + synchronized (connect) { + final Session session = (Session) key.attachment(); + if (session.state != State.CLOSED) { + close(key); + } + } + } catch (ClosedChannelException ex) { + synchronized (connect) { + final Session session = (Session) key.attachment(); + if (session.state != State.CLOSED) { + close(key); + } + } + } catch (IOException ex) { + final Session session = (Session) key.attachment(); + session.trace(ex.getClass().getSimpleName() + ": " + ex.getMessage()); + close(key); + } - client.configureBlocking(false); + } - Session session = new Session(client, address, null); - session.trace("accept"); - session.state(java.nio.channels.SelectionKey.OP_READ, State.GREETING); - } + // This loop can generate WRITE keys (the heartbeats we send) + for (SelectionKey key : selector.keys()) { + final Session session = (Session) key.attachment(); - if (key.isConnectable()) { + try { + if (session != null && session.state == State.HEARTBEAT) session.tick(); + } catch (IOException ex) { + close(key); + } + } - // we are a client + // Here is where we actually will expire missing services + tracker.checkServices(); - Session session = (Session) key.attachment(); - session.channel.finishConnect(); - session.trace("connected"); + initiateConnections(); - // when you are a client, first say high to everyone - // before accepting data + selectorTimeout = adjustedSelectorTimeout(start); + } + } - // once a server reads our address, it will send it's - // full list of known addresses, followed by the "end" - // address to signal that it is done. + private long adjustedSelectorTimeout(long start) { + final long end = System.nanoTime(); + final long elapsed = TimeUnit.NANOSECONDS.toMillis(end - start); + final long heartRate = tracker.getHeartRate(); - // we will then send our full list of known addresses, - // followed by the "end" address to signal we are done. + return Math.max(1, heartRate - elapsed); + } - // Afterward the server will only pulls its heartbeat + private void initiateConnections() { + synchronized (connect) { + while (connect.size() > 0) { - // separately, we will initiate connections to everyone - // in the list who we have not yet seen. + final URI uri = connect.removeFirst(); - // WRITE our GREETING - session.write(me); + if (connections.containsKey(uri)) continue; - session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING); - } + final int port = uri.getPort(); + final String host = uri.getHost(); - if (key.isReadable()) { + try { + println("open " + uri); + // Create a non-blocking NIO channel + final SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); - Session session = (Session) key.attachment(); + final InetSocketAddress address = new InetSocketAddress(host, port); - switch (session.state) { - case GREETING: { // read + socketChannel.connect(address); - // This state is only reachable as a SERVER - // The client connected and said hello by sending - // its URI to let us know who they are + final Session session = new Session(socketChannel, address, uri); + session.ops(SelectionKey.OP_CONNECT); + session.trace("client"); + connections.put(session.uri, session); - // Once this is read, the client will expect us - // to send our full list of URIs followed by the - // "end" address. + // seen - needs to get maintained as "connected" + // TODO remove from seen + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } - // So we switch to WRITE LISTING and they switch - // to READ LISTING + private void doWrite(SelectionKey key) throws IOException { + final Session session = (Session) key.attachment(); - // Then we will switch to READ LISTING and they - // will switch to WRITE LISTING - - String message = session.read(); + switch (session.state) { + case GREETING: { // write - if (message == null) break; // need to read more + // Only CLIENTs write a GREETING message + // As we are a client, the first thing we do + // is READ the server's LIST - session.setURI(URI.create(message)); + if (session.drain()) { + session.state(SelectionKey.OP_READ, State.LISTING); + } - connected(session); + } + break; - session.trace("welcome"); + case LISTING: { // write - ArrayList<URI> list = connections(); + if (session.drain()) { - // When they read themselves on the list - // they'll know it's time to list their URIs + if (session.client) { + // CLIENTs list last, so at this point we've read + // the server's list and have written ours - list.remove(me); // yank - list.add(END_LIST); // add to the end + session.trace("DONE WRITING"); - session.write(list); + session.state(SelectionKey.OP_READ, State.HEARTBEAT); - session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING); + } else { + // SERVERs always write their list first, so at this + // point we switch to LIST READ mode - session.trace("STARTING"); + session.state(SelectionKey.OP_READ, State.LISTING); + } + } + } + break; - } - break; + case HEARTBEAT: { // write - case LISTING: { // read + if (session.drain()) { - String message = null; + session.last = System.currentTimeMillis(); - while ((message = session.read()) != null) { + session.trace("send"); - session.trace(message); + session.state(SelectionKey.OP_READ, State.HEARTBEAT); - URI uri = URI.create(message); + } - if (END_LIST.equals(uri)) { + } + break; + } + } - if (session.client) { + private void doRead(SelectionKey key) throws IOException { + final Session session = (Session) key.attachment(); - ArrayList<URI> list = connections(); + switch (session.state) { + case GREETING: { // read - for (URI reported : session.listed) { - list.remove(reported); - } + // This state is only reachable as a SERVER + // The client connected and said hello by sending + // its URI to let us know who they are - // When they read us on the list - // they'll know it's time to switch to heartbeat + // Once this is read, the client will expect us + // to send our full list of URIs followed by the + // "end" address. - list.remove(session.uri); - list.add(END_LIST); - - session.write(list); + // So we switch to WRITE LISTING and they switch + // to READ LISTING - session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING); + // Then we will switch to READ LISTING and they + // will switch to WRITE LISTING - } else { + final String message = session.read(); - // We are a SERVER in this relationship, so we will have already - // listed our known peers by this point. From here we switch to - // heartbeating - - // heartbeat time - if (session.hangup) { - session.state(0, State.CLOSED); - session.trace("hangup"); - hangup(key); + if (message == null) break; // need to read more - } else { + session.setURI(URI.create(message)); - session.trace("DONE READING"); + connected(session); - session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT); + session.trace("welcome"); - } + final ArrayList<URI> list = connections(); - } + // When they read themselves on the list + // they'll know it's time to list their URIs - break; + list.remove(me); // yank + list.add(END_LIST); // add to the end - } else { + session.write(list); - session.listed.add(uri); + session.state(SelectionKey.OP_WRITE, State.LISTING); - try { - connect(uri); - } catch (Exception e) { - println("connect failed " + uri + " - " + e.getMessage()); - e.printStackTrace(); - } - } - } + session.trace("STARTING"); - } - break; - case HEARTBEAT: { // read + } + break; - String message = null; - while ((message = session.read()) != null) { - tracker.processData(message); - } - } - break; - } + case LISTING: { // read - } + String message = null; - if (key.isWritable()) { + while ((message = session.read()) != null) { - Session session = (Session) key.attachment(); + session.trace(message); - switch (session.state) { - case GREETING: { // write + final URI uri = URI.create(message); - // Only CLIENTs write a GREETING message - // As we are a client, the first thing we do - // is READ the server's LIST - - if (session.drain()) { - session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING); - } + if (END_LIST.equals(uri)) { - } - break; + if (session.client) { - case LISTING: { // write + final ArrayList<URI> list = connections(); - if (session.drain()) { + for (URI reported : session.listed) { + list.remove(reported); + } - if (session.client) { - // CLIENTs list last, so at this point we've read - // the server's list and have written ours - - session.trace("DONE WRITING"); + // When they read us on the list + // they'll know it's time to switch to heartbeat - session.state(SelectionKey.OP_READ, State.HEARTBEAT); - - } else { - // SERVERs always write their list first, so at this - // point we switch to LIST READ mode + list.remove(session.uri); + list.add(END_LIST); - session.state(SelectionKey.OP_READ, State.LISTING); + session.write(list); - } - } - } - break; + session.state(SelectionKey.OP_WRITE, State.LISTING); - case HEARTBEAT: { // write + } else { - if (session.drain()) { + // We are a SERVER in this relationship, so we will have already + // listed our known peers by this point. From here we switch to + // heartbeating - session.last = System.currentTimeMillis(); + // heartbeat time + if (session.hangup) { + session.state(0, State.CLOSED); + session.trace("hangup"); + hangup(key); - session.trace("ping"); + } else { - session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT); + session.trace("DONE READING"); - } + session.state(SelectionKey.OP_READ, State.HEARTBEAT); } - break; - } - } - } catch (CancelledKeyException ex) { - synchronized (connect) { - Session session = (Session) key.attachment(); - if (session.state != State.CLOSED) { - close(key); } - } - } catch (ClosedChannelException ex) { - synchronized (connect) { - Session session = (Session) key.attachment(); - if (session.state != State.CLOSED) { - close(key); + + break; + + } else { + + session.listed.add(uri); + + try { + connect(uri); + } catch (Exception e) { + println("connect failed " + uri + " - " + e.getMessage()); + e.printStackTrace(); } } - } catch (IOException ex) { - Session session = (Session) key.attachment(); - session.trace(ex.getClass().getSimpleName() + ": " + ex.getMessage()); - close(key); } } + break; - for (SelectionKey key : selector.keys()) { - Session session = (Session) key.attachment(); + case HEARTBEAT: { // read - try { - if (session != null && session.state == State.HEARTBEAT) session.tick(); - } catch (IOException ex) { - close(key); + String message = null; + while ((message = session.read()) != null) { + session.trace(message); + tracker.processData(message); } } + break; + } + } - synchronized (connect) { - while (connect.size() > 0) { + private void doConnect(SelectionKey key) throws IOException { + // we are a client - URI uri = connect.removeFirst(); + final Session session = (Session) key.attachment(); + session.channel.finishConnect(); + session.trace("connected"); - if (connections.containsKey(uri)) continue; + // when you are a client, first say high to everyone + // before accepting data - int port = uri.getPort(); - String host = uri.getHost(); + // once a server reads our address, it will send it's + // full list of known addresses, followed by the "end" + // address to signal that it is done. - try { - println("open " + uri); + // we will then send our full list of known addresses, + // followed by the "end" address to signal we are done. - SocketChannel socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); + // Afterward the server will only pulls its heartbeat - InetSocketAddress address = new InetSocketAddress(host, port); + // separately, we will initiate connections to everyone + // in the list who we have not yet seen. - socketChannel.connect(address); + // WRITE our GREETING + session.write(me); - Session session = new Session(socketChannel, address, uri); - session.ops(java.nio.channels.SelectionKey.OP_CONNECT); - session.trace("client"); - connections.put(session.uri, session); - - // seen - needs to get maintained as "connected" - // TODO remove from seen - } catch (IOException e) { - log.warning("Error connecting to " + host + ":" + port, e); - } - } - } - } + session.state(SelectionKey.OP_WRITE, State.GREETING); + } + + private void doAccept(SelectionKey key) throws IOException { + // we are a server + + // when you are a server, we must first listen for the + // address of the client before sending data. + + // once they send us their address, we will send our + // full list of known addresses, followed by the "end" + // address to signal that we are done. + + // Afterward we will only pulls our heartbeat + + final ServerSocketChannel server = (ServerSocketChannel) key.channel(); + final SocketChannel client = server.accept(); + final InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress(); + + client.configureBlocking(false); + + final Session session = new Session(client, address, null); + session.trace("accept"); + session.state(SelectionKey.OP_READ, State.GREETING); } private ArrayList<URI> connections() { synchronized (connect) { - ArrayList<URI> list = new ArrayList<URI>(connections.keySet()); + final ArrayList<URI> list = new ArrayList<URI>(connections.keySet()); list.addAll(connect); return list; } } private void close(SelectionKey key) { - Session session = (Session) key.attachment(); + final Session session = (Session) key.attachment(); session.state(0, State.CLOSED); @@ -675,7 +731,7 @@ public class MultipointServer { // by us. We will both have detected this situation // and know it needs fixing. Only one of us can hangup - Session[] sessions = {session, duplicate}; + final Session[] sessions = {session, duplicate}; Arrays.sort(sessions, new Comparator<Session>() { // Goal: Keep the connection with the lowest port number /// @@ -700,13 +756,13 @@ public class MultipointServer { } private int server(Session a) { - Socket socket = a.channel.socket(); - return a.client? socket.getPort(): socket.getLocalPort(); + final Socket socket = a.channel.socket(); + return a.client ? socket.getPort() : socket.getLocalPort(); } private int client(Session a) { - Socket socket = a.channel.socket(); - return !a.client? socket.getPort(): socket.getLocalPort(); + final Socket socket = a.channel.socket(); + return !a.client ? socket.getPort() : socket.getLocalPort(); } }); @@ -724,8 +780,139 @@ public class MultipointServer { } private void println(String s) { -// if (s.matches(".*(Listening|DONE|KEEP|KILL)")) { +// if (debug && s.matches(".*(Listening|DONE|KEEP|KILL)")) { // System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s); // } } + + @Override + public String toString() { + return "MultipointServer{" + + "name='" + name + '\'' + + ", me=" + me + + '}'; + } + + private static String randomColor() { + String[] colors = { + "almond", + "amber", + "amethyst", + "apple", + "apricot", + "aqua", + "aquamarine", + "ash", + "azure", + "banana", + "beige", + "black", + "blue", + "brick", + "bronze", + "brown", + "burgundy", + "carrot", + "charcoal", + "cherry", + "chestnut", + "chocolate", + "chrome", + "cinnamon", + "citrine", + "cobalt", + "copper", + "coral", + "cornflower", + "cotton", + "cream", + "crimson", + "cyan", + "ebony", + "emerald", + "forest", + "fuchsia", + "ginger", + "gold", + "goldenrod", + "gray", + "green", + "grey", + "indigo", + "ivory", + "jade", + "jasmine", + "khaki", + "lava", + "lavender", + "lemon", + "lilac", + "lime", + "macaroni", + "magenta", + "magnolia", + "mahogany", + "malachite", + "mango", + "maroon", + "mauve", + "mint", + "moonstone", + "navy", + "ocean", + "olive", + "onyx", + "orange", + "orchid", + "papaya", + "peach", + "pear", + "pearl", + "periwinkle", + "pine", + "pink", + "pistachio", + "platinum", + "plum", + "prune", + "pumpkin", + "purple", + "quartz", + "raspberry", + "red", + "rose", + "rosewood", + "ruby", + "salmon", + "sapphire", + "scarlet", + "sienna", + "silver", + "slate", + "strawberry", + "tan", + "tangerine", + "taupe", + "teal", + "titanium", + "topaz", + "turquoise", + "umber", + "vanilla", + "violet", + "watermelon", + "white", + "yellow" + }; + + final Random random = new Random(); + long l = random.nextLong(); + + if (l < 0) l *= -1; + + final long index = l % colors.length; + final String s = colors[(int) index]; + + return s; + } } Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java?rev=1221925&r1=1221924&r2=1221925&view=diff ============================================================================== --- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java (original) +++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java Wed Dec 21 22:48:19 2011 @@ -48,8 +48,9 @@ public class Tracker { private final int maxReconnectAttempts; private final long exponentialBackoff; private final boolean useExponentialBackOff; + private final boolean debug; - public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff, final Logger log) { + public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff, final Logger log, boolean debug) { this.group = group; this.groupPrefix = group + ":"; @@ -61,7 +62,7 @@ public class Tracker { this.exponentialBackoff = exponentialBackoff; this.useExponentialBackOff = exponentialBackoff > 1; this.log = log; - + this.debug = debug; this.log.info("Created " + this); } @@ -74,6 +75,10 @@ public class Tracker { return heartRate; } + public int getMaxMissedHeartbeats() { + return maxMissedHeartbeats; + } + public void setDiscoveryListener(DiscoveryListener discoveryListener) { this.discoveryListener = discoveryListener; } @@ -147,7 +152,7 @@ public class Tracker { for (ServiceVitals serviceVitals : discoveredServices.values()) { if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) { - if (log.isDebugEnabled()) { + if (debug()) { log.debug("Expired " + serviceVitals.service + String.format(" Timeout{lastSeen=%s, threshold=%s}", serviceVitals.getLastHeartbeat() - now, threshold )); } @@ -159,6 +164,10 @@ public class Tracker { } } + private boolean debug() { + return debug && log.isDebugEnabled(); + } + private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { public Thread newThread(Runnable runable) { Thread t = new Thread(runable, "Discovery Agent Notifier"); @@ -168,7 +177,7 @@ public class Tracker { }); private void fireServiceRemovedEvent(final URI uri) { - if (log.isDebugEnabled()) { + if (debug()) { log.debug(String.format("Removed Service{uri=%s}", uri)); } @@ -189,7 +198,7 @@ public class Tracker { } private void fireServiceAddedEvent(final URI uri) { - if (log.isDebugEnabled()) { + if (debug()) { log.debug(String.format("Added Service{uri=%s}", uri)); } @@ -262,7 +271,7 @@ public class Tracker { // Consider that the service recovery has succeeded if it has not // failed in 60 seconds. if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) { - if (log.isDebugEnabled()) { + if (debug()) { log.debug("I now think that the " + service + " service has recovered."); } failureCount = 0; @@ -289,7 +298,7 @@ public class Tracker { delay = reconnectDelay; } - if (log.isDebugEnabled()) { + if (debug()) { log.debug("Remote failure of " + service + " while still receiving multicast advertisements. " + "Advertising events will be suppressed for " + delay + " ms, the current failure count is: " + failureCount); @@ -312,7 +321,7 @@ public class Tracker { // Are we done trying to recover this guy? if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) { - if (log.isDebugEnabled()) { + if (debug()) { log.debug("Max reconnect attempts of the " + service + " service has been reached."); } return false; @@ -323,7 +332,7 @@ public class Tracker { return false; } - if (log.isDebugEnabled()) { + if (debug()) { log.debug("Resuming event advertisement of the " + service + " service."); } dead = false; @@ -357,6 +366,7 @@ public class Tracker { private long exponentialBackoff = 0; private int maxReconnectAttempts = 10; // todo: check this out private Logger logger; + private boolean debug; // --------------------------------- @@ -424,9 +434,17 @@ public class Tracker { this.logger = logger; } + public boolean isDebug() { + return debug; + } + + public void setDebug(boolean debug) { + this.debug = debug; + } + public Tracker build() { logger = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), Tracker.class); - return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff, logger); + return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff, logger, debug); } } Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java?rev=1221925&r1=1221924&r2=1221925&view=diff ============================================================================== --- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java (original) +++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java Wed Dec 21 22:48:19 2011 @@ -20,8 +20,6 @@ import junit.framework.TestCase; import org.apache.openejb.server.DiscoveryListener; import org.apache.openejb.server.DiscoveryRegistry; import org.apache.openejb.util.Join; -import org.apache.openejb.util.LogCategory; -import org.apache.openejb.util.Logger; import java.net.URI; import java.util.ArrayList; @@ -65,13 +63,13 @@ public class MultipointDiscoveryAgentTes }; final List<Node> nodes = new ArrayList<Node>(); - final Node root = new Node("0", listener); + final Node root = new Node(0, listener); nodes.add(root); for (int i = 0; i < PEERS; i++) { - final Node node = new Node("0", listener, root.getAgent().getPort()); + final Node node = new Node(0, listener, root.getAgent().getPort()); nodes.add(node); } @@ -100,14 +98,50 @@ public class MultipointDiscoveryAgentTes } } + public void _debug() throws Exception { + System.setProperty("log4j.category.OpenEJB.server.discovery", "debug"); + + System.setProperty("log4j.appender.C.layout", "org.apache.log4j.PatternLayout"); + System.setProperty("log4j.appender.C.layout.ConversionPattern", "%d - %m%n"); + + final URI greenService = new URI("green://localhost:5555"); + final Node green = new Node(5555, new Listener("green"), true, "green", 5000); + + green.getRegistry().registerService(greenService); + +// launch(green, "blue", 4444); +// launch(green, "red", 6666); +// launch(green, "yellow", 8888); + final Node orange = launch(green, "orange", 7777); + + Thread.sleep(500000); + + orange.getAgent().stop(); + + Thread.sleep(5000); + + } + + private Node launch(Node green, String color, int port) throws Exception { + final URI orangeService = new URI(color + "://localhost:"+ port); + final Node orange = new Node(port, new Listener(color), green.getPort()); + orange.getRegistry().registerService(orangeService); + Thread.sleep(100); + return orange; + } + public static class Node { private final MultipointDiscoveryAgent agent; private final DiscoveryRegistry registry; - public Node(String p, DiscoveryListener listener, int... peers) throws Exception { - this.agent = new MultipointDiscoveryAgent(); + public Node(int p, DiscoveryListener listener, int... peers) throws Exception { + this(p, listener, false, null, 5000, peers); + } + + public Node(int p, DiscoveryListener listener, boolean debug, String name, int heartRate, int... peers) throws Exception { + this.agent = new MultipointDiscoveryAgent(debug, name); final Properties props = new Properties(); - props.put("port", p); + props.put("port", p+""); List<String> uris = new ArrayList<String>(peers.length); for (int port : peers) { @@ -115,8 +149,8 @@ public class MultipointDiscoveryAgentTes } props.put("initialServers", Join.join(",", uris)); - props.put("max_missed_heartbeats", "2"); - props.put("heart_rate", "200"); + props.put("max_missed_heartbeats", "1"); + props.put("heart_rate", ""+ heartRate); agent.init(props); this.registry = new DiscoveryRegistry(agent); @@ -131,5 +165,25 @@ public class MultipointDiscoveryAgentTes public DiscoveryRegistry getRegistry() { return registry; } + + public int getPort() { + return agent.getPort(); + } + } + + private static class Listener implements DiscoveryListener { + private final String name; + + private Listener(String name) { + this.name = name; + } + + public void serviceAdded(URI service) { +// System.out.printf("[%s] added = %s\n", name, service); + } + + public void serviceRemoved(URI service) { +// System.out.printf("[%s] removed = %s\n", name, service); + } } }
