Author: andygumbrecht
Date: Fri Jul 6 09:33:24 2012
New Revision: 1358092
URL: http://svn.apache.org/viewvc?rev=1358092&view=rev
Log:
Trying a latch - Also makes sense for client and agent.
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1358092&r1=1358091&r2=1358092&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
(original)
+++
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
Fri Jul 6 09:33:24 2012
@@ -4,6 +4,7 @@ import java.io.IOException;
import java.net.*;
import java.nio.charset.Charset;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -174,6 +175,7 @@ public class MulticastPulseClient extend
//Start threads that listen for multicast packets on our channel.
//These need to start 'before' we pulse a request.
final ArrayList<Future> futures = new ArrayList<Future>();
+ final CountDownLatch latch = new CountDownLatch(clientSockets.length);
for (final MulticastSocket socket : clientSockets) {
@@ -181,6 +183,7 @@ public class MulticastPulseClient extend
@Override
public void run() {
+ latch.countDown();
final DatagramPacket response = new DatagramPacket(new
byte[2048], 2048);
while (running.get()) {
@@ -273,13 +276,21 @@ public class MulticastPulseClient extend
}));
}
- //Pulse the server - It is thread safe to use same sockets as
send/receive synchronization is only on the packet
- for (final MulticastSocket socket : clientSockets) {
- try {
- socket.send(request);
- } catch (Throwable e) {
- //Ignore
+ try {
+ latch.await();
+
+ //Pulse the server - It is thread safe to use same sockets as
send/receive synchronization is only on the packet
+ for (final MulticastSocket socket : clientSockets) {
+ try {
+ socket.send(request);
+ } catch (Throwable e) {
+ //Ignore
+ }
}
+
+ } catch (InterruptedException e) {
+ //Terminate as quickly as possible
+ timeout = 1;
}
//Kill the threads after timeout
Modified:
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1358092&r1=1358091&r2=1358092&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
(original)
+++
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Fri Jul 6 09:33:24 2012
@@ -20,10 +20,12 @@ import java.util.Enumeration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -194,6 +196,8 @@ public class MulticastPulseAgent impleme
throw new ServiceException("Failed to get Multicast sockets",
e);
}
+ final CountDownLatch latch = new
CountDownLatch(this.sockets.length);
+
for (final MulticastSocket socket : this.sockets) {
this.futures.add(executor.submit(new Runnable() {
@@ -201,6 +205,7 @@ public class MulticastPulseAgent impleme
public void run() {
final DatagramPacket request = new DatagramPacket(new
byte[2048], 2048);
+ latch.countDown();
while (MulticastPulseAgent.this.running.get()) {
@@ -250,6 +255,12 @@ public class MulticastPulseAgent impleme
}
}));
}
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ this.stop();
+ }
}
}
Modified:
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1358092&r1=1358091&r2=1358092&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
(original)
+++
openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Fri Jul 6 09:33:24 2012
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -66,7 +67,7 @@ public class MulticastPulseAgentTest {
final Properties p = new Properties();
p.setProperty("bind", host);
- p.setProperty("port", "" + 6142);
+ p.setProperty("port", "" + port);
agent = new MulticastPulseAgent();
agent.init(p);
@@ -144,6 +145,7 @@ public class MulticastPulseAgentTest {
//Start threads that listen for multicast packets on our channel.
//These need to start 'before' we pulse a request.
final ArrayList<Future> futures = new ArrayList<Future>();
+ final CountDownLatch latch = new CountDownLatch(clientSockets.length);
for (final MulticastSocket socket : clientSockets) {
@@ -161,6 +163,8 @@ public class MulticastPulseAgentTest {
final DatagramPacket response = new DatagramPacket(new
byte[2048], 2048);
+ latch.countDown();
+
while (running.get()) {
try {
@@ -257,13 +261,25 @@ public class MulticastPulseAgentTest {
}));
}
- //Pulse the server - It is thread safe to use same sockets as
send/receive synchronization is only on the packet
- for (final MulticastSocket socket : clientSockets) {
- try {
- socket.send(request);
- } catch (Throwable e) {
- //Ignore
+ //Allow slow thread starts
+ System.out.println("Wait for threads to start");
+ int timeout = 5000;
+ try {
+
+ latch.await();
+ System.out.println("Threads have started");
+
+ //Pulse the server - It is thread safe to use same sockets as
send/receive synchronization is only on the packet
+ for (final MulticastSocket socket : clientSockets) {
+ try {
+ socket.send(request);
+ } catch (Throwable e) {
+ //Ignore
+ }
}
+
+ } catch (InterruptedException e) {
+ timeout = 1;
}
//Kill the threads after timeout
@@ -295,7 +311,7 @@ public class MulticastPulseAgentTest {
}
}
}
- }, 10000);
+ }, timeout);
//Wait for threads to complete
for (final Future future : futures) {
@@ -332,8 +348,6 @@ public class MulticastPulseAgentTest {
private final String id;
public MyDiscoveryListener(String id) {
- id += " ";
- id = id.substring(0, 8);
this.id = id;
}