Repository: incubator-gossip
Updated Branches:
  refs/heads/master e3010c854 -> c62ebaf9b


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
 
b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
index 3f509a6..d6aaa15 100644
--- 
a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
+++ 
b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
@@ -19,6 +19,7 @@ package org.apache.gossip.transport.udp;
 
 import org.apache.gossip.manager.GossipCore;
 import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
 import org.apache.gossip.transport.AbstractTransportManager;
 import org.apache.log4j.Logger;
 
@@ -30,12 +31,13 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class is constructed by reflection in GossipManager.
  * It manages transport (byte read/write) operations over UDP.
  */
-public class UdpTransportManager extends AbstractTransportManager {
+public class UdpTransportManager extends AbstractTransportManager implements 
Runnable {
   
   public static final Logger LOGGER = 
Logger.getLogger(UdpTransportManager.class);
   
@@ -44,12 +46,14 @@ public class UdpTransportManager extends 
AbstractTransportManager {
   
   private final int soTimeout;
   
+  private final Thread me;
+  
+  private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+  
   /** required for reflection to work! */
   public UdpTransportManager(GossipManager gossipManager, GossipCore 
gossipCore) {
     super(gossipManager, gossipCore);
-    
     soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
-    
     try {
       SocketAddress socketAddress = new 
InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
               gossipManager.getMyself().getUri().getPort());
@@ -58,12 +62,38 @@ public class UdpTransportManager extends 
AbstractTransportManager {
       LOGGER.warn(ex);
       throw new RuntimeException(ex);
     }
+    me = new Thread(this);
   }
 
   @Override
+  public void run() {
+    while (keepRunning.get()) {
+      try {
+        byte[] buf = read();
+        try {
+          Base message = gossipManager.getProtocolManager().read(buf);
+          gossipCore.receive(message);
+          //TODO this is suspect
+          gossipManager.getMemberStateRefresher().run();
+        } catch (RuntimeException ex) {//TODO trap json exception
+          LOGGER.error("Unable to process message", ex);
+        }
+      } catch (IOException e) {
+        // InterruptedException are completely normal here because of the 
blocking lifecycle.
+        if (!(e.getCause() instanceof InterruptedException)) {
+          LOGGER.error(e);
+        }
+        keepRunning.set(false);
+      }
+    }
+  }
+  
+  @Override
   public void shutdown() {
+    keepRunning.set(false);
     server.close();
     super.shutdown();
+    me.interrupt();
   }
 
   /**
@@ -81,13 +111,13 @@ public class UdpTransportManager extends 
AbstractTransportManager {
 
   @Override
   public void send(URI endpoint, byte[] buf) throws IOException {
-    DatagramSocket socket = new DatagramSocket();
-    socket.setSoTimeout(soTimeout);
-    InetAddress dest = InetAddress.getByName(endpoint.getHost());
-    DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, 
endpoint.getPort());
-    socket.send(payload);
     // todo: investigate UDP socket reuse. It would save a little 
setup/teardown time wrt to the local socket.
-    socket.close();
+    try (DatagramSocket socket = new DatagramSocket()){
+      socket.setSoTimeout(soTimeout);
+      InetAddress dest = InetAddress.getByName(endpoint.getHost());
+      DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, 
endpoint.getPort());
+      socket.send(payload);
+    }
   }
   
   private void debug(byte[] jsonBytes) {
@@ -96,4 +126,10 @@ public class UdpTransportManager extends 
AbstractTransportManager {
       LOGGER.debug("Received message ( bytes): " + receivedMessage);
     }
   }
+
+  @Override
+  public void startEndpoint() {
+    me.start();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
 
b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
index 5258374..8a27d0a 100644
--- 
a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
+++ 
b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
@@ -17,14 +17,9 @@
  */
 package org.apache.gossip.transport.udp;
 
-import org.apache.gossip.GossipSettings;
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
 public class UdpTransportIntegrationTest {
   
   // It's currently impossible to create a UdpTransportManager without 
bringing up an entire stack.

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 97aa409..1c48306 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
                <module>gossip-base</module>
                <module>gossip-transport-udp</module>
                <module>gossip-protocol-jackson</module>
+               <module>gossip-itest</module>
        </modules>
        
        <description>A peer to peer cluster discovery service</description>

Reply via email to