Author: andygumbrecht
Date: Thu Jul 5 16:32:05 2012
New Revision: 1357726
URL: http://svn.apache.org/viewvc?rev=1357726&view=rev
Log:
Fix multipulse test.
Fix multipulse services substring - Thanks to the test ;-)
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1357726&r1=1357725&r2=1357726&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
(original)
+++
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
Thu Jul 5 16:32:05 2012
@@ -137,7 +137,6 @@ public class MulticastPulseClient extend
final byte[] bytes = (MulticastPulseClient.CLIENT +
forGroup).getBytes(UTF8);
final DatagramPacket request = new DatagramPacket(bytes, bytes.length,
new InetSocketAddress(ia, port));
-
final AtomicBoolean running = new AtomicBoolean(true);
final MulticastSocket[] clientSockets =
MulticastPulseClient.getSockets(ia, port);
final Timer timer = new Timer(true);
@@ -210,7 +209,7 @@ public class MulticastPulseClient extend
continue;
}
- final String services = s.substring(0,
s.indexOf('|'));
+ final String services = s.substring(0,
s.lastIndexOf('|'));
s = s.substring(services.length() + 1);
final String[] serviceList =
services.split("\\|");
@@ -319,6 +318,8 @@ public class MulticastPulseClient extend
}
}
+ futures.clear();
+
return set;
}
Modified:
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1357726&r1=1357725&r2=1357726&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
(original)
+++
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Thu Jul 5 16:32:05 2012
@@ -17,6 +17,8 @@
package org.apache.openejb.server.discovery;
import org.apache.openejb.server.DiscoveryListener;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.net.DatagramPacket;
@@ -28,6 +30,7 @@ import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
@@ -36,6 +39,9 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -46,19 +52,23 @@ import java.util.concurrent.atomic.Atomi
public class MulticastPulseAgentTest {
private static final Set<String> schemes = new
HashSet<String>(Arrays.asList("ejbd", "ejbds", "http"));
+ private static ExecutorService executor;
+ private static final Charset utf8 = Charset.forName("UTF-8");
+ private static final String forGroup = "*";
+ private static final String host = "239.255.3.2";
+ private static final int port = 6142;
+ private static MulticastPulseAgent agent;
- @Test
- public void test() throws Exception {
+ @BeforeClass
+ public static void beforeClass() throws Exception {
- final String group = "*";
- final String host = "239.255.3.2";
- final int port = 6142;
+ executor = Executors.newFixedThreadPool(10);
final Properties p = new Properties();
p.setProperty("bind", host);
p.setProperty("port", "" + 6142);
- MulticastPulseAgent agent = new MulticastPulseAgent();
+ agent = new MulticastPulseAgent();
agent.init(p);
agent.setDiscoveryListener(new MyDiscoveryListener("test"));
agent.registerService(new URI("ejb:ejbd://[::]:4201"));
@@ -66,22 +76,57 @@ public class MulticastPulseAgentTest {
agent.registerService(new URI("ejb:http://127.0.0.1:4201"));
agent.registerService(new URI("ejb:https://0.0.0.1:4201"));
agent.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ agent.stop();
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void test() throws Exception {
- final byte[] bytes = (MulticastPulseAgent.CLIENT +
group).getBytes(Charset.forName("utf8"));
- final InetAddress ia = InetAddress.getByName(host);
- DatagramPacket dp = new DatagramPacket(bytes, bytes.length, new
InetSocketAddress(ia, port));
+ final InetAddress ia;
+
+ try {
+ ia = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ throw new Exception(host + " is not a valid address", e);
+ }
+
+ if (null == ia || !ia.isMulticastAddress()) {
+ throw new Exception(host + " is not a valid multicast address");
+ }
+
+ final byte[] bytes = (MulticastPulseAgent.CLIENT +
forGroup).getBytes(utf8);
+ final DatagramPacket request = new DatagramPacket(bytes, bytes.length,
new InetSocketAddress(ia, port));
final AtomicBoolean running = new AtomicBoolean(true);
- final AtomicBoolean passed = new AtomicBoolean(false);
- final MulticastSocket client = MulticastPulseAgent.getSockets(host,
port)[0];
+
+ final MulticastSocket[] clientSockets =
MulticastPulseAgent.getSockets(host, port);
final Timer timer = new Timer(true);
+
final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
@Override
public int compare(URI u1, URI u2) {
+ //Ignore server hostname
+ final String serverHost = u1.getScheme();
+ u1 = URI.create(u1.getSchemeSpecificPart());
+ u2 = URI.create(u2.getSchemeSpecificPart());
+
+ //Ignore scheme (ejb,ejbs,etc.)
u1 = URI.create(u1.getSchemeSpecificPart());
u2 = URI.create(u2.getSchemeSpecificPart());
+ if (u1.getHost().equals(serverHost)) {
+ //If the service host is the same as the server host
+ //then keep it at the top of the list
+ return -1;
+ }
+
+ //Compare URI hosts
int i = u1.getHost().compareTo(u2.getHost());
if (i == 0) {
@@ -92,117 +137,170 @@ public class MulticastPulseAgentTest {
}
});
- //Start a thread that listens for multicast packets
- final Thread t = new Thread() {
- @Override
- public void run() {
+ //Start threads that listen for multicast packets on our channel.
+ //These need to start 'before' we pulse a request.
+ final ArrayList<Future> futures = new ArrayList<Future>();
- while (running.get()) {
- try {
+ for (final MulticastSocket socket : clientSockets) {
- final DatagramPacket dgp = new DatagramPacket(new
byte[512], 512);
+ futures.add(executor.submit(new Runnable() {
+ @Override
+ public void run() {
- client.receive(dgp);
+ final DatagramPacket response = new DatagramPacket(new
byte[2048], 2048);
- final SocketAddress sa = dgp.getSocketAddress();
+ while (running.get()) {
+ try {
- if (null != sa) {
+ socket.receive(response);
- String s = new String(dgp.getData()).trim();
- if (s.startsWith(MulticastPulseAgent.SERVER)) {
+ final SocketAddress sa =
response.getSocketAddress();
- s = (s.replace(MulticastPulseAgent.SERVER,
""));
- final String group = s.substring(0,
s.indexOf(':'));
- s = s.substring(group.length() + 1);
+ if (null != sa && (sa instanceof
InetSocketAddress)) {
- final String services = s.substring(0,
s.lastIndexOf('|'));
- s = s.substring(services.length() + 1);
+ int len = response.getLength();
+ if (len > 2048) {
+ len = 2048;
+ }
- final String[] service = services.split("\\|");
- final String[] hosts = s.split(",");
+ String s = new String(response.getData(), 0,
len);
- System.out.println(String.format("Client
received Server pulse:\n\t%1$s\n\t%2$s\n\t%3$s\n", group, services, s));
+ if (s.startsWith(MulticastPulseAgent.SERVER)) {
- for (String svc : service) {
+ s = (s.replace(MulticastPulseAgent.SERVER,
""));
+ final String group = s.substring(0,
s.indexOf(':'));
+ s = s.substring(group.length() + 1);
- if (MulticastPulseAgent.EMPTY.equals(svc))
{
+ if (!"*".equals(forGroup) &&
!forGroup.equals(group)) {
continue;
}
- URI test = null;
- try {
- test = URI.create(svc);
- } catch (Throwable e) {
- continue;
- }
+ final String services = s.substring(0,
s.lastIndexOf('|'));
+ s = s.substring(services.length() + 1);
+
+ final String[] serviceList =
services.split("\\|");
+ final String[] hosts = s.split(",");
- if (schemes.contains(test.getScheme())) {
+ System.out.println(String.format("Client
received Server pulse:\n\t%1$s\n\t%2$s\n\t%3$s\n", group, services, s));
- svc = (group + ":" + svc);
+ for (String svc : serviceList) {
+
+ if
(MulticastPulseAgent.EMPTY.equals(svc)) {
+ continue;
+ }
+ final URI serviceUri;
try {
- if (svc.contains("0.0.0.0")) {
- for (final String h : hosts) {
-
set.add(URI.create(svc.replace("0.0.0.0", ipFormat(h))));
+ serviceUri = URI.create(svc);
+ } catch (Throwable e) {
+ continue;
+ }
+
+ if
(schemes.contains(serviceUri.getScheme())) {
+
+ //Just because multicast was
received on this host is does not mean the service is on the same
+ //We can however use this to
identify an individual machine and group
+ final String serverHost =
((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress();
+
+ final String serviceHost =
serviceUri.getHost();
+ if
(MulticastPulseAgent.isLocalAddress(serviceHost, false)) {
+ if
(!MulticastPulseAgent.isLocalAddress(serverHost, false)) {
+ //A local service is only
available to a local client
+ continue;
}
- } else if (svc.contains("[::]")) {
- for (final String h : hosts) {
-
set.add(URI.create(svc.replace("[::]", ipFormat(h))));
+ }
+
+ svc = ("mp-" + serverHost + ":" +
group + ":" + svc);
+
+ try {
+ if (svc.contains("0.0.0.0")) {
+ for (final String h :
hosts) {
+
set.add(URI.create(svc.replace("0.0.0.0", ipFormat(h))));
+ }
+ } else if
(svc.contains("[::]")) {
+ for (final String h :
hosts) {
+
set.add(URI.create(svc.replace("[::]", ipFormat(h))));
+ }
+ } else {
+ //Just add as is
+ set.add(URI.create(svc));
}
- } else {
- //Just add as is
- set.add(URI.create(svc));
+ } catch (Throwable e) {
+ //Ignore
}
- } catch (Throwable e) {
- //Ignore
+ } else {
+ System.out.println("Reject
service: " + serviceUri.toASCIIString());
}
}
}
-
- running.set(false);
- timer.cancel();
- passed.set(true);
}
+
+ } catch (Throwable e) {
+ //Ignore
}
+ }
+
+ System.out.println("Exit MulticastPulse client thread");
+ }
+ }));
+ }
+
+ //Pulse the server - It is thread safe to use same sockets as
send/receive synchronization is only on the packet
+ for (final MulticastSocket socket : clientSockets) {
+ try {
+ socket.send(request);
+ } catch (Throwable e) {
+ //Ignore
+ }
+ }
+
+ //Kill the threads after timeout
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+
+ running.set(false);
+
+ for (Future future : futures) {
+ try {
+ future.cancel(true);
+ } catch (Throwable e) {
+ //Ignore
+ }
+ }
+ for (final MulticastSocket socket : clientSockets) {
+
+ try {
+ socket.leaveGroup(ia);
+ } catch (Throwable e) {
+ //Ignore
+ }
+ try {
+ socket.close();
} catch (Throwable e) {
//Ignore
}
}
+ }
+ }, 1500);
- System.out.println("Exit MulticastPulse client thread");
+ //Wait for threads to complete
+ for (final Future future : futures) {
+ try {
+ future.get();
+ } catch (Throwable e) {
+ //Ignore
}
- };
- t.setDaemon(true);
- t.start();
-
- if (running.get()) {
- //Kill the thread after timeout
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- running.set(false);
- client.close();
- t.interrupt();
- System.out.println("Interrupted MultiPulse client");
- }
- }, 1000);
}
- //Pulse the server
- final MulticastSocket ms = MulticastPulseAgent.getSockets(host,
port)[0];
- ms.send(dp);
-
- //Wait for thread to die
- t.join();
-
- agent.stop();
+ futures.clear();
- for (URI uri : set) {
+ for (final URI uri : set) {
System.out.println(uri.toASCIIString());
}
- org.junit.Assert.assertTrue(passed.get());
+ org.junit.Assert.assertTrue(set.size() > 0);
}
private String ipFormat(final String h) throws UnknownHostException {