Repository: incubator-gossip Updated Branches: refs/heads/master e3010c854 -> c62ebaf9b
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java index 3f509a6..d6aaa15 100644 --- a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java +++ b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java @@ -19,6 +19,7 @@ package org.apache.gossip.transport.udp; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; import org.apache.gossip.transport.AbstractTransportManager; import org.apache.log4j.Logger; @@ -30,12 +31,13 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class is constructed by reflection in GossipManager. * It manages transport (byte read/write) operations over UDP. */ -public class UdpTransportManager extends AbstractTransportManager { +public class UdpTransportManager extends AbstractTransportManager implements Runnable { public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class); @@ -44,12 +46,14 @@ public class UdpTransportManager extends AbstractTransportManager { private final int soTimeout; + private final Thread me; + + private final AtomicBoolean keepRunning = new AtomicBoolean(true); + /** required for reflection to work! */ public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) { super(gossipManager, gossipCore); - soTimeout = gossipManager.getSettings().getGossipInterval() * 2; - try { SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), gossipManager.getMyself().getUri().getPort()); @@ -58,12 +62,38 @@ public class UdpTransportManager extends AbstractTransportManager { LOGGER.warn(ex); throw new RuntimeException(ex); } + me = new Thread(this); } @Override + public void run() { + while (keepRunning.get()) { + try { + byte[] buf = read(); + try { + Base message = gossipManager.getProtocolManager().read(buf); + gossipCore.receive(message); + //TODO this is suspect + gossipManager.getMemberStateRefresher().run(); + } catch (RuntimeException ex) {//TODO trap json exception + LOGGER.error("Unable to process message", ex); + } + } catch (IOException e) { + // InterruptedException are completely normal here because of the blocking lifecycle. + if (!(e.getCause() instanceof InterruptedException)) { + LOGGER.error(e); + } + keepRunning.set(false); + } + } + } + + @Override public void shutdown() { + keepRunning.set(false); server.close(); super.shutdown(); + me.interrupt(); } /** @@ -81,13 +111,13 @@ public class UdpTransportManager extends AbstractTransportManager { @Override public void send(URI endpoint, byte[] buf) throws IOException { - DatagramSocket socket = new DatagramSocket(); - socket.setSoTimeout(soTimeout); - InetAddress dest = InetAddress.getByName(endpoint.getHost()); - DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); - socket.send(payload); // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket. - socket.close(); + try (DatagramSocket socket = new DatagramSocket()){ + socket.setSoTimeout(soTimeout); + InetAddress dest = InetAddress.getByName(endpoint.getHost()); + DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); + socket.send(payload); + } } private void debug(byte[] jsonBytes) { @@ -96,4 +126,10 @@ public class UdpTransportManager extends AbstractTransportManager { LOGGER.debug("Received message ( bytes): " + receivedMessage); } } + + @Override + public void startEndpoint() { + me.start(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java index 5258374..8a27d0a 100644 --- a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java +++ b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java @@ -17,14 +17,9 @@ */ package org.apache.gossip.transport.udp; -import org.apache.gossip.GossipSettings; -import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - public class UdpTransportIntegrationTest { // It's currently impossible to create a UdpTransportManager without bringing up an entire stack. http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 97aa409..1c48306 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ <module>gossip-base</module> <module>gossip-transport-udp</module> <module>gossip-protocol-jackson</module> + <module>gossip-itest</module> </modules> <description>A peer to peer cluster discovery service</description>