Repository: incubator-gossip
Updated Branches:
  refs/heads/master c544b8bf1 -> 851cd93e6


GOSSIP-79 Isolate UDP and JSON code

With these changes, it should now be possible to create alternate serialization 
(e.g. Gson or native) and transports (like HTTP).

To make this PR reviewable I decided against creating new modules right now. 
That can be done subsequently in another PR that doesn't modify any code.

* Creates two new interfaces: `TransportManager` and `ProtocolManager`
  * Implementation classes must honor a common constructor interface
* Includes UDP and Jackson implementations of those.
* `AbstractTransportManager` has a lot of boilerplate that includes:
  * starting the active gossiper, and
  * starting the passive gossiper.

I spent some time trying to polish the implementations to become less dependent 
on references to `GossipManager`. I still feel there is a lot of room for 
improvement.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/851cd93e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/851cd93e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/851cd93e

Branch: refs/heads/master
Commit: 851cd93e67674133407dd180e0d340b41e0fa4f9
Parents: c544b8b
Author: Gary Dusbabek <gdusba...@gmail.com>
Authored: Mon Apr 17 12:37:42 2017 -0500
Committer: Gary Dusbabek <gdusba...@gmail.com>
Committed: Wed Apr 19 09:32:58 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipSettings.java  |  12 +-
 .../org/apache/gossip/manager/GossipCore.java   |  65 +-------
 .../apache/gossip/manager/GossipManager.java    | 154 +++++++++++--------
 .../gossip/manager/GossipManagerBuilder.java    |  20 +--
 .../gossip/manager/PassiveGossipThread.java     |  61 ++------
 .../gossip/manager/RingStatePersister.java      |  43 +++---
 .../gossip/manager/UserDataPersister.java       |  50 +++---
 .../OnlyProcessReceivedPassiveGossipThread.java |  33 ----
 .../gossip/protocol/JacksonProtocolManager.java | 131 ++++++++++++++++
 .../apache/gossip/protocol/ProtocolManager.java |  41 +++++
 .../transport/AbstractTransportManager.java     |  86 +++++++++++
 .../gossip/transport/TransportManager.java      |  40 +++++
 .../gossip/transport/UdpTransportManager.java   |  98 ++++++++++++
 .../apache/gossip/utils/ReflectionUtils.java    |  53 +++++++
 .../java/org/apache/gossip/crdt/OrSetTest.java  |  16 +-
 .../manager/GossipManagerBuilderTest.java       |  38 ++---
 .../gossip/manager/RingPersistenceTest.java     |   2 +-
 .../gossip/manager/UserDataPersistenceTest.java |   4 +-
 .../manager/handlers/MessageHandlerTest.java    |   2 +-
 19 files changed, 636 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java 
b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 6b2bf8b..e4a95d3 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -45,6 +45,9 @@ public class GossipSettings {
   
   private String activeGossipClass = 
"org.apache.gossip.manager.SimpleActiveGossipper";
   
+  private String transportManagerClass = 
"org.apache.gossip.transport.UdpTransportManager";
+  private String protocolManagerClass = 
"org.apache.gossip.protocol.JacksonProtocolManager";
+  
   private Map<String,String> activeGossipProperties = new HashMap<>();
   
   private String pathToRingState = "./";
@@ -222,5 +225,12 @@ public class GossipSettings {
   public void setSignMessages(boolean signMessages) {
     this.signMessages = signMessages;
   }
-  
+
+  public String getTransportManagerClass() {
+    return transportManagerClass;
+  }
+
+  public String getProtocolManagerClass() {
+    return protocolManagerClass;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
index d01a84c..e034432 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -29,16 +29,8 @@ import org.apache.gossip.model.*;
 import org.apache.gossip.udp.Trackable;
 import org.apache.log4j.Logger;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
 import java.net.URI;
-import java.security.*;
-import java.security.spec.InvalidKeySpecException;
-import java.security.spec.PKCS8EncodedKeySpec;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
@@ -60,8 +52,6 @@ public class GossipCore implements GossipCoreConstants {
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>> perNodeData;
   private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
   private final BlockingQueue<Runnable> workQueue;
-  private final PKCS8EncodedKeySpec privKeySpec;
-  private final PrivateKey privKey;
   private final Meter messageSerdeException;
   private final Meter tranmissionException;
   private final Meter tranmissionSuccess;
@@ -79,42 +69,6 @@ public class GossipCore implements GossipCoreConstants {
     messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
     tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
     tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
-
-    if (manager.getSettings().isSignMessages()){
-      File privateKey = new File(manager.getSettings().getPathToKeyStore(), 
manager.getMyself().getId());
-      File publicKey = new File(manager.getSettings().getPathToKeyStore(), 
manager.getMyself().getId() + ".pub");
-      if (!privateKey.exists()){
-        throw new IllegalArgumentException("private key not found " + 
privateKey);
-      }
-      if (!publicKey.exists()){
-        throw new IllegalArgumentException("public key not found " + 
publicKey);
-      }
-      try (FileInputStream keyfis = new FileInputStream(privateKey)) {
-        byte[] encKey = new byte[keyfis.available()];
-        keyfis.read(encKey);
-        keyfis.close();
-        privKeySpec = new PKCS8EncodedKeySpec(encKey);
-        KeyFactory keyFactory = KeyFactory.getInstance("DSA");
-        privKey = keyFactory.generatePrivate(privKeySpec);
-      } catch (NoSuchAlgorithmException | InvalidKeySpecException | 
IOException e) {
-        throw new RuntimeException("failed hard", e);
-      }
-    } else {
-      privKeySpec = null;
-      privKey = null;
-    }
-  }
-
-  private byte [] sign(byte [] bytes){
-    Signature dsa;
-    try {
-      dsa = Signature.getInstance("SHA1withDSA", "SUN");
-      dsa.initSign(privKey);
-      dsa.update(bytes);
-      return dsa.sign();
-    } catch (NoSuchAlgorithmException | NoSuchProviderException | 
InvalidKeyException | SignatureException e) {
-      throw new RuntimeException(e);
-    } 
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -184,30 +138,21 @@ public class GossipCore implements GossipCoreConstants {
 
   /**
    * Sends a blocking message.
+   * todo: move functionality to TransportManager layer.
    * @param message
    * @param uri
    * @throws RuntimeException if data can not be serialized or in transmission 
error
    */
-  private void sendInternal(Base message, URI uri){
+  private void sendInternal(Base message, URI uri) {
     byte[] json_bytes;
     try {
-      if (privKey == null){
-        json_bytes = 
gossipManager.getObjectMapper().writeValueAsBytes(message);
-      } else {
-        SignedPayload p = new SignedPayload();
-        
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
-        p.setSignature(sign(p.getData()));
-        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
-      }
+      json_bytes = gossipManager.getProtocolManager().write(message);
     } catch (IOException e) {
       messageSerdeException.mark();
       throw new RuntimeException(e);
     }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
-      InetAddress dest = InetAddress.getByName(uri.getHost());
-      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, 
json_bytes.length, dest, uri.getPort());
-      socket.send(datagramPacket);
+    try {
+      gossipManager.getTransportManager().send(uri, json_bytes);
       tranmissionSuccess.mark();
     } catch (IOException e) {
       tranmissionException.mark();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index ff70ccc..b1752cd 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,6 +18,7 @@
 package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.LocalMember;
@@ -26,13 +27,14 @@ import org.apache.gossip.crdt.Crdt;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.handlers.MessageHandler;
-import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.protocol.ProtocolManager;
+import org.apache.gossip.transport.TransportManager;
+import org.apache.gossip.utils.ReflectionUtils;
 import org.apache.log4j.Logger;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.File;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
@@ -46,14 +48,21 @@ import java.util.stream.Collectors;
 public abstract class GossipManager {
 
   public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
+  
+  // this mapper is used for ring and user-data persistence only. NOT messages.
+  public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{
+    enableDefaultTyping();
+    configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
+  }};
 
   private final ConcurrentSkipListMap<LocalMember, GossipState> members;
   private final LocalMember me;
   private final GossipSettings settings;
   private final AtomicBoolean gossipServiceRunning;
-  private AbstractActiveGossiper activeGossipThread;
-  private PassiveGossipThread passiveGossipThread;
-  private ExecutorService gossipThreadExecutor;
+  
+  private TransportManager transportManager;
+  private ProtocolManager protocolManager;
+  
   private final GossipCore gossipCore;
   private final DataReaper dataReaper;
   private final Clock clock;
@@ -62,14 +71,13 @@ public abstract class GossipManager {
   private final RingStatePersister ringState;
   private final UserDataPersister userDataState;
   private final GossipMemberStateRefresher memberStateRefresher;
-  private final ObjectMapper objectMapper;
-
+  
   private final MessageHandler messageHandler;
-
+  
   public GossipManager(String cluster,
                        URI uri, String id, Map<String, String> properties, 
GossipSettings settings,
                        List<Member> gossipMembers, GossipListener listener, 
MetricRegistry registry,
-                       ObjectMapper objectMapper, MessageHandler 
messageHandler) {
+                       MessageHandler messageHandler) {
     this.settings = settings;
     this.messageHandler = messageHandler;
 
@@ -89,14 +97,15 @@ public abstract class GossipManager {
         members.put(member, GossipState.DOWN);
       }
     }
-    gossipThreadExecutor = Executors.newCachedThreadPool();
     gossipServiceRunning = new AtomicBoolean(true);
     this.scheduledServiced = Executors.newScheduledThreadPool(1);
     this.registry = registry;
-    this.ringState = new RingStatePersister(this);
-    this.userDataState = new UserDataPersister(this, this.gossipCore);
+    this.ringState = new 
RingStatePersister(GossipManager.buildRingStatePath(this), this);
+    this.userDataState = new UserDataPersister(
+        gossipCore,
+        GossipManager.buildPerNodeDataPath(this),
+        GossipManager.buildSharedDataPath(this));
     this.memberStateRefresher = new GossipMemberStateRefresher(members, 
settings, listener, this::findPerNodeGossipData);
-    this.objectMapper = objectMapper;
     readSavedRingState();
     readSavedDataState();
   }
@@ -140,49 +149,66 @@ public abstract class GossipManager {
     return me;
   }
 
-  private AbstractActiveGossiper constructActiveGossiper(){
-    try {
-      Constructor<?> c = 
Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class,
 GossipCore.class, MetricRegistry.class);
-      return (AbstractActiveGossiper) c.newInstance(this, gossipCore, 
registry);
-    } catch (NoSuchMethodException | SecurityException | 
ClassNotFoundException | InstantiationException | IllegalAccessException | 
IllegalArgumentException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   /**
    * Starts the client. Specifically, start the various cycles for this 
protocol. Start the gossip
    * thread and start the receiver thread.
    */
   public void init() {
-    passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, 
gossipCore);
-    gossipThreadExecutor.execute(passiveGossipThread);
-    activeGossipThread = constructActiveGossiper();
-    activeGossipThread.init();
+    
+    // protocol manager and transport managers are specified in settings.
+    // construct them here via reflection.
+    
+    protocolManager = ReflectionUtils.constructWithReflection(
+        settings.getProtocolManagerClass(),
+        new Class<?>[] { GossipSettings.class, String.class, 
MetricRegistry.class },
+        new Object[] { settings, me.getId(), this.getRegistry() }
+    );
+    
+    transportManager = ReflectionUtils.constructWithReflection(
+        settings.getTransportManagerClass(),
+        new Class<?>[] { GossipManager.class, GossipCore.class},
+        new Object[] { this, gossipCore }
+    );
+    
+    // start processing gossip messages.
+    transportManager.startEndpoint();
+    transportManager.startActiveGossiper();
+    
     dataReaper.init();
-    scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
-    scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, 
TimeUnit.SECONDS);
+    if (settings.isPersistRingState()) {
+      scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, 
TimeUnit.SECONDS);
+    }
+    if (settings.isPersistDataState()) {
+      scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, 
TimeUnit.SECONDS);
+    }
     scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, 
TimeUnit.MILLISECONDS);
     LOGGER.debug("The GossipManager is started.");
   }
-
+  
   private void readSavedRingState() {
-    for (LocalMember l : ringState.readFromDisk()){
-      LocalMember member = new LocalMember(l.getClusterName(),
-              l.getUri(), l.getId(),
-              clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
-              settings.getMinimumSamples(), settings.getDistribution());
-      members.putIfAbsent(member, GossipState.DOWN);
+    if (settings.isPersistRingState()) {
+      for (LocalMember l : ringState.readFromDisk()) {
+        LocalMember member = new LocalMember(l.getClusterName(),
+            l.getUri(), l.getId(),
+            clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
+            settings.getMinimumSamples(), settings.getDistribution());
+        members.putIfAbsent(member, GossipState.DOWN);
+      }
     }
   }
 
   private void readSavedDataState() {
-    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : 
userDataState.readPerNodeFromDisk().entrySet()){
-      for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
-        gossipCore.addPerNodeData(j.getValue());
+    if (settings.isPersistDataState()) {
+      for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : 
userDataState.readPerNodeFromDisk().entrySet()) {
+        for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()) {
+          gossipCore.addPerNodeData(j.getValue());
+        }
       }
     }
-    for (Entry<String, SharedDataMessage> l: 
userDataState.readSharedDataFromDisk().entrySet()){
-      gossipCore.addSharedData(l.getValue());
+    if (settings.isPersistRingState()) {
+      for (Entry<String, SharedDataMessage> l : 
userDataState.readSharedDataFromDisk().entrySet()) {
+        gossipCore.addSharedData(l.getValue());
+      }
     }
   }
 
@@ -191,24 +217,9 @@ public abstract class GossipManager {
    */
   public void shutdown() {
     gossipServiceRunning.set(false);
-    gossipThreadExecutor.shutdown();
     gossipCore.shutdown();
+    transportManager.shutdown();
     dataReaper.close();
-    if (passiveGossipThread != null) {
-      passiveGossipThread.shutdown();
-    }
-    if (activeGossipThread != null) {
-      activeGossipThread.shutdown();
-    }
-    try {
-      boolean result = gossipThreadExecutor.awaitTermination(10, 
TimeUnit.MILLISECONDS);
-      if (!result) {
-        LOGGER.error("executor shutdown timed out");
-      }
-    } catch (InterruptedException e) {
-      LOGGER.error(e);
-    }
-    gossipThreadExecutor.shutdownNow();
     scheduledServiced.shutdown();
     try {
       scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
@@ -234,7 +245,6 @@ public abstract class GossipManager {
     gossipCore.addSharedData(message);
   }
 
-
   @SuppressWarnings("rawtypes")
   public Crdt findCrdt(String key){
     SharedDataMessage l = gossipCore.getSharedData().get(key);
@@ -308,12 +318,32 @@ public abstract class GossipManager {
     return clock;
   }
 
-  public ObjectMapper getObjectMapper() {
-    return objectMapper;
-  }
-
   public MetricRegistry getRegistry() {
     return registry;
   }
 
+  public ProtocolManager getProtocolManager() {
+    return protocolManager;
+  }
+
+  public TransportManager getTransportManager() {
+    return transportManager;
+  }
+  
+  // todo: consider making these path methods part of GossipSettings
+  
+  public static File buildRingStatePath(GossipManager manager) {
+    return new File(manager.getSettings().getPathToRingState(), "ringstate." + 
manager.getMyself().getClusterName() + "."
+        + manager.getMyself().getId() + ".json");
+  }
+  
+  public static File buildSharedDataPath(GossipManager manager){
+    return new File(manager.getSettings().getPathToDataState(), "shareddata."
+            + manager.getMyself().getClusterName() + "." + 
manager.getMyself().getId() + ".json");
+  }
+  
+  public static File buildPerNodeDataPath(GossipManager manager) {
+    return new File(manager.getSettings().getPathToDataState(), "pernodedata."
+            + manager.getMyself().getClusterName() + "." + 
manager.getMyself().getId() + ".json");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index bb73177..86dca57 100644
--- 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -18,12 +18,9 @@
 package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
-import com.fasterxml.jackson.core.JsonGenerator.Feature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.Member;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.StartupSettings;
-import org.apache.gossip.crdt.CrdtModule;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.manager.handlers.MessageHandlerFactory;
@@ -49,7 +46,6 @@ public class GossipManagerBuilder {
     private GossipListener listener;
     private MetricRegistry registry;
     private Map<String,String> properties;
-    private ObjectMapper objectMapper;
     private MessageHandler messageHandler;
 
     private ManagerBuilder() {}
@@ -108,11 +104,6 @@ public class GossipManagerBuilder {
       this.uri = uri;
       return this;
     }
-    
-    public ManagerBuilder mapper(ObjectMapper objectMapper){
-      this.objectMapper = objectMapper;
-      return this;
-    }
 
     public ManagerBuilder messageHandler(MessageHandler messageHandler) {
       this.messageHandler = messageHandler;
@@ -136,16 +127,11 @@ public class GossipManagerBuilder {
       if (gossipMembers == null) {
         gossipMembers = new ArrayList<>();
       }
-      if (objectMapper == null) {
-        objectMapper = new ObjectMapper();
-        objectMapper.enableDefaultTyping();
-        objectMapper.registerModule(new CrdtModule());
-        objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
-      }
+      
       if (messageHandler == null) {
         messageHandler = MessageHandlerFactory.defaultHandler();
-      } 
-      return new GossipManager(cluster, uri, id, properties, settings, 
gossipMembers, listener, registry, objectMapper, messageHandler) {} ;
+      }
+      return new GossipManager(cluster, uri, id, properties, settings, 
gossipMembers, listener, registry, messageHandler) {} ;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index ae28bf7..30e39d5 100644
--- 
a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -18,34 +18,25 @@
 package org.apache.gossip.manager;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gossip.model.Base;
-import org.apache.gossip.model.SignedPayload;
 import org.apache.log4j.Logger;
 
-import com.codahale.metrics.Meter;
 
 /**
  * This class handles the passive cycle,
  * where this client has received an incoming message. 
  */
-abstract public class PassiveGossipThread implements Runnable {
+public class PassiveGossipThread implements Runnable {
 
   public static final Logger LOGGER = 
Logger.getLogger(PassiveGossipThread.class);
 
-  /** The socket used for the passive thread of the gossip service. */
-  private final DatagramSocket server;
+  
   private final AtomicBoolean keepRunning;
   private final GossipCore gossipCore;
   private final GossipManager gossipManager;
-  private final Meter signed;
-  private final Meter unsigned;
 
   public PassiveGossipThread(GossipManager gossipManager, GossipCore 
gossipCore) {
     this.gossipManager = gossipManager;
@@ -53,38 +44,18 @@ abstract public class PassiveGossipThread implements 
Runnable {
     if (gossipManager.getMyself().getClusterName() == null){
       throw new IllegalArgumentException("Cluster was null");
     }
-    try {
-      SocketAddress socketAddress = new 
InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
-              gossipManager.getMyself().getUri().getPort());
-      server = new DatagramSocket(socketAddress);
-    } catch (SocketException ex) {
-      LOGGER.warn(ex);
-      throw new RuntimeException(ex);
-    }
+    
     keepRunning = new AtomicBoolean(true);
-    signed = 
gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
-    unsigned = 
gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
   }
 
   @Override
   public void run() {
     while (keepRunning.get()) {
       try {
-        byte[] buf = new byte[server.getReceiveBufferSize()];
-        DatagramPacket p = new DatagramPacket(buf, buf.length);
-        server.receive(p);
-        debug(p.getData());
+        byte[] buf = gossipManager.getTransportManager().read();
         try {
-          Base activeGossipMessage = 
gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
-          if (activeGossipMessage instanceof SignedPayload){
-            SignedPayload s = (SignedPayload) activeGossipMessage;
-            Base nested = 
gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
-            gossipCore.receive(nested);
-            signed.mark();
-          } else {
-            gossipCore.receive(activeGossipMessage);
-            unsigned.mark();
-          }
+          Base message = gossipManager.getProtocolManager().read(buf);
+          gossipCore.receive(message);
           gossipManager.getMemberStateRefresher().run();
         } catch (RuntimeException ex) {//TODO trap json exception
           LOGGER.error("Unable to process message", ex);
@@ -94,21 +65,9 @@ abstract public class PassiveGossipThread implements 
Runnable {
         keepRunning.set(false);
       }
     }
-    shutdown();
-  }
-
-  private void debug(byte[] jsonBytes) {
-    if (LOGGER.isDebugEnabled()){
-      String receivedMessage = new String(jsonBytes);
-      LOGGER.debug("Received message ( bytes): " + receivedMessage);
-    }
   }
-
-  public void shutdown() {
-    try {
-      server.close();
-    } catch (RuntimeException ex) {
-    }
+  
+  public void requestStop() {
+    keepRunning.set(false);
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
index 7e42562..0af9f12 100644
--- 
a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -26,16 +26,24 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.LocalMember;
+import org.apache.gossip.crdt.CrdtModule;
 import org.apache.log4j.Logger;
 
 public class RingStatePersister implements Runnable {
 
   private static final Logger LOGGER = 
Logger.getLogger(RingStatePersister.class);
-  private GossipManager parent;
+  private final File path;
+  // NOTE: this is a different instance than what gets used for message 
marshalling.
+  private final ObjectMapper objectMapper;
+  private final GossipManager manager;
   
-  public RingStatePersister(GossipManager parent){
-    this.parent = parent;
+  public RingStatePersister(File path, GossipManager manager){
+    this.path = path;
+    this.objectMapper = GossipManager.metdataObjectMapper;
+    this.manager = manager;
   }
   
   @Override
@@ -43,34 +51,25 @@ public class RingStatePersister implements Runnable {
     writeToDisk();
   }
   
-  File computeTarget(){
-    return new File(parent.getSettings().getPathToRingState(), "ringstate." + 
parent.getMyself().getClusterName() + "." 
-            + parent.getMyself().getId() + ".json");
-  }
-  
-  void writeToDisk(){
-    if (!parent.getSettings().isPersistRingState()){
-      return;
-    }
-    NavigableSet<LocalMember> i = parent.getMembers().keySet();
-    try (FileOutputStream fos = new FileOutputStream(computeTarget())){
-      parent.getObjectMapper().writeValue(fos, i);
+  void writeToDisk() {
+    NavigableSet<LocalMember> i = manager.getMembers().keySet();
+    try (FileOutputStream fos = new FileOutputStream(path)){
+      objectMapper.writeValue(fos, i);
     } catch (IOException e) {
       LOGGER.debug(e);
     }
   }
 
   @SuppressWarnings("unchecked")
-  List<LocalMember> readFromDisk(){
-    if (!parent.getSettings().isPersistRingState()){
-      return Collections.emptyList();
+  List<LocalMember> readFromDisk() {
+    if (!path.exists()) {
+      return new ArrayList<>();
     }
-    try (FileInputStream fos = new FileInputStream(computeTarget())){
-      return parent.getObjectMapper().readValue(fos, ArrayList.class);
+    try (FileInputStream fos = new FileInputStream(path)){
+      return objectMapper.readValue(fos, ArrayList.class);
     } catch (IOException e) {
       LOGGER.debug(e);
     }
-    return Collections.emptyList();
+    return new ArrayList<>();
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
index 3b9eafa..28c3151 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
 import org.apache.log4j.Logger;
@@ -30,31 +31,26 @@ import org.apache.log4j.Logger;
 public class UserDataPersister implements Runnable {
   
   private static final Logger LOGGER = 
Logger.getLogger(UserDataPersister.class);
-  private final GossipManager parent;
   private final GossipCore gossipCore; 
   
-  UserDataPersister(GossipManager parent, GossipCore gossipCore){
-    this.parent = parent;
-    this.gossipCore = gossipCore;
-  }
-  
-  File computeSharedTarget(){
-    return new File(parent.getSettings().getPathToDataState(), "shareddata."
-            + parent.getMyself().getClusterName() + "." + 
parent.getMyself().getId() + ".json");
-  }
+  private final File perNodePath;
+  private final File sharedPath;
+  private final ObjectMapper objectMapper;
   
-  File computePerNodeTarget() {
-    return new File(parent.getSettings().getPathToDataState(), "pernodedata."
-            + parent.getMyself().getClusterName() + "." + 
parent.getMyself().getId() + ".json");
+  UserDataPersister(GossipCore gossipCore, File perNodePath, File sharedPath) {
+    this.gossipCore = gossipCore;
+    this.objectMapper = GossipManager.metdataObjectMapper;
+    this.perNodePath = perNodePath;
+    this.sharedPath = sharedPath;
   }
   
   @SuppressWarnings("unchecked")
   ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> 
readPerNodeFromDisk(){
-    if (!parent.getSettings().isPersistDataState()){
+    if (!perNodePath.exists()) {
       return new ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>>();
     }
-    try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
-      return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
+    try (FileInputStream fos = new FileInputStream(perNodePath)){
+      return objectMapper.readValue(fos, ConcurrentHashMap.class);
     } catch (IOException e) {
       LOGGER.debug(e);
     }
@@ -62,22 +58,16 @@ public class UserDataPersister implements Runnable {
   }
   
   void writePerNodeToDisk(){
-    if (!parent.getSettings().isPersistDataState()){
-      return;
-    }
-    try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
-      parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData());
+    try (FileOutputStream fos = new FileOutputStream(perNodePath)){
+      objectMapper.writeValue(fos, gossipCore.getPerNodeData());
     } catch (IOException e) {
       LOGGER.warn(e);
     }
   }
   
   void writeSharedToDisk(){
-    if (!parent.getSettings().isPersistDataState()){
-      return;
-    }
-    try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
-      parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData());
+    try (FileOutputStream fos = new FileOutputStream(sharedPath)){
+      objectMapper.writeValue(fos, gossipCore.getSharedData());
     } catch (IOException e) {
       LOGGER.warn(e);
     }
@@ -85,11 +75,11 @@ public class UserDataPersister implements Runnable {
 
   @SuppressWarnings("unchecked")
   ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
-    if (!parent.getSettings().isPersistRingState()){
-      return new ConcurrentHashMap<String, SharedDataMessage>();
+    if (!sharedPath.exists()) {
+      return new ConcurrentHashMap<>();
     }
-    try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
-      return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
+    try (FileInputStream fos = new FileInputStream(sharedPath)){
+      return objectMapper.readValue(fos, ConcurrentHashMap.class);
     } catch (IOException e) {
       LOGGER.debug(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
deleted file mode 100644
index dff5056..0000000
--- 
a/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.impl;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.PassiveGossipThread;
-import org.apache.log4j.Logger;
-
-public class OnlyProcessReceivedPassiveGossipThread extends 
PassiveGossipThread {
-  
-  public static final Logger LOGGER = 
Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
-
-  public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, 
GossipCore gossipCore) {
-    super(gossipManager, gossipCore);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
 
b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
new file mode 100644
index 0000000..91ed7f9
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.protocol;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.crdt.CrdtModule;
+import org.apache.gossip.manager.PassiveGossipConstants;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SignedPayload;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.InvalidKeyException;
+import java.security.KeyFactory;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.SignatureException;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
+
+// this class is constructed by reflection in GossipManager.
+public class JacksonProtocolManager implements ProtocolManager {
+  
+  private final ObjectMapper objectMapper;
+  private final PrivateKey privKey;
+  private final Meter signed;
+  private final Meter unsigned;
+  
+  /** required for reflection to work! */
+  public JacksonProtocolManager(GossipSettings settings, String id, 
MetricRegistry registry) {
+    // set up object mapper.
+    objectMapper = buildObjectMapper(settings);
+    
+    // set up message signing.
+    if (settings.isSignMessages()){
+      File privateKey = new File(settings.getPathToKeyStore(), id);
+      File publicKey = new File(settings.getPathToKeyStore(), id + ".pub");
+      if (!privateKey.exists()){
+        throw new IllegalArgumentException("private key not found " + 
privateKey);
+      }
+      if (!publicKey.exists()){
+        throw new IllegalArgumentException("public key not found " + 
publicKey);
+      }
+      try (FileInputStream keyfis = new FileInputStream(privateKey)) {
+        byte[] encKey = new byte[keyfis.available()];
+        keyfis.read(encKey);
+        keyfis.close();
+        PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey);
+        KeyFactory keyFactory = KeyFactory.getInstance("DSA");
+        privKey = keyFactory.generatePrivate(privKeySpec);
+      } catch (NoSuchAlgorithmException | InvalidKeySpecException | 
IOException e) {
+        throw new RuntimeException("failed hard", e);
+      }
+    } else {
+      privKey = null;
+    }
+    
+    signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE);
+    unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
+  }
+
+  @Override
+  public byte[] write(Base message) throws IOException {
+    byte[] json_bytes;
+    if (privKey == null){
+      json_bytes = objectMapper.writeValueAsBytes(message);
+    } else {
+      SignedPayload p = new SignedPayload();
+      p.setData(objectMapper.writeValueAsString(message).getBytes());
+      p.setSignature(sign(p.getData(), privKey));
+      json_bytes = objectMapper.writeValueAsBytes(p);
+    }
+    return json_bytes;
+  }
+
+  @Override
+  public Base read(byte[] buf) throws IOException {
+    Base activeGossipMessage = objectMapper.readValue(buf, Base.class);
+    if (activeGossipMessage instanceof SignedPayload){
+      SignedPayload s = (SignedPayload) activeGossipMessage;
+      signed.mark();
+      return objectMapper.readValue(s.getData(), Base.class);
+    } else {
+      unsigned.mark();
+      return activeGossipMessage;
+    }
+  }
+
+  public static ObjectMapper buildObjectMapper(GossipSettings settings) {
+    ObjectMapper om = new ObjectMapper();
+    om.enableDefaultTyping();
+    // todo: should be specified in the configuration.
+    om.registerModule(new CrdtModule());
+    om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
+    return om;
+  }
+  
+  private static byte[] sign(byte [] bytes, PrivateKey pk){
+    Signature dsa;
+    try {
+      dsa = Signature.getInstance("SHA1withDSA", "SUN");
+      dsa.initSign(pk);
+      dsa.update(bytes);
+      return dsa.sign();
+    } catch (NoSuchAlgorithmException | NoSuchProviderException | 
InvalidKeyException | SignatureException e) {
+      throw new RuntimeException(e);
+    } 
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java 
b/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java
new file mode 100644
index 0000000..0f553c7
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.protocol;
+
+import org.apache.gossip.model.Base;
+
+import java.io.IOException;
+
+/** interface for managing message marshaling. */
+public interface ProtocolManager {
+
+  /** serialize a message
+   * @param message
+   * @return serialized message.
+   * @throws IOException
+   */
+  byte[] write(Base message) throws IOException;
+
+  /**
+   * Reads the next message from a byte source.
+   * @param buf
+   * @return a gossip message.
+   * @throws IOException
+   */
+  Base read(byte[] buf) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
 
b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
new file mode 100644
index 0000000..497e605
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.manager.AbstractActiveGossiper;
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.PassiveGossipThread;
+import org.apache.gossip.utils.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manage the protcol threads (active and passive gossipers).
+ */
+public abstract class AbstractTransportManager implements TransportManager {
+  
+  public static final Logger LOGGER = 
Logger.getLogger(AbstractTransportManager.class);
+  
+  private final PassiveGossipThread passiveGossipThread;
+  private final ExecutorService gossipThreadExecutor;
+  
+  private final AbstractActiveGossiper activeGossipThread;
+  
+  public AbstractTransportManager(GossipManager gossipManager, GossipCore 
gossipCore) {
+    
+    passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore);
+    gossipThreadExecutor = Executors.newCachedThreadPool();
+    activeGossipThread = ReflectionUtils.constructWithReflection(
+      gossipManager.getSettings().getActiveGossipClass(),
+        new Class<?>[]{
+            GossipManager.class, GossipCore.class, MetricRegistry.class
+        },
+        new Object[]{
+            gossipManager, gossipCore, gossipManager.getRegistry()
+        });
+  }
+
+  // shut down threads etc.
+  @Override
+  public void shutdown() {
+    passiveGossipThread.requestStop();
+    gossipThreadExecutor.shutdown();
+    if (activeGossipThread != null) {
+      activeGossipThread.shutdown();
+    }
+    try {
+      boolean result = gossipThreadExecutor.awaitTermination(10, 
TimeUnit.MILLISECONDS);
+      if (!result) {
+        LOGGER.error("executor shutdown timed out");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+    gossipThreadExecutor.shutdownNow();
+  }
+
+  @Override
+  public void startActiveGossiper() {
+    activeGossipThread.init(); 
+  }
+
+  @Override
+  public void startEndpoint() {
+    gossipThreadExecutor.execute(passiveGossipThread);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java 
b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
new file mode 100644
index 0000000..031d90e
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** interface for manage that sends and receives messages that have already 
been serialized. */
+public interface TransportManager {
+  
+  /** starts the active gossip thread responsible for reaching out to remote 
nodes. Not related to `startEndpoint()` */
+  void startActiveGossiper();
+  
+  /** starts the passive gossip thread that receives messages from remote 
nodes. Not related to `startActiveGossiper()` */
+  void startEndpoint();
+  
+  /** attempts to shutdown all threads. */
+  void shutdown();
+  
+  /** sends a payload to an endpoint. */
+  void send(URI endpoint, byte[] buf) throws IOException;
+  
+  /** gets the next payload being sent to this node */
+  byte[] read() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
 
b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
new file mode 100644
index 0000000..d80deec
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.URI;
+
+/**
+ * This class is constructed by reflection in GossipManager.
+ * It manages transport (byte read/write) operations over UDP.
+ */
+public class UdpTransportManager extends AbstractTransportManager {
+  
+  public static final Logger LOGGER = 
Logger.getLogger(UdpTransportManager.class);
+  
+  /** The socket used for the passive thread of the gossip service. */
+  private final DatagramSocket server;
+  
+  private final int soTimeout;
+  
+  /** required for reflection to work! */
+  public UdpTransportManager(GossipManager gossipManager, GossipCore 
gossipCore) {
+    super(gossipManager, gossipCore);
+    
+    soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
+    
+    try {
+      SocketAddress socketAddress = new 
InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
+              gossipManager.getMyself().getUri().getPort());
+      server = new DatagramSocket(socketAddress);
+    } catch (SocketException ex) {
+      LOGGER.warn(ex);
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    server.close();
+    super.shutdown();
+  }
+
+  /**
+   * blocking read a message.
+   * @return buffer of message contents.
+   * @throws IOException
+   */
+  public byte[] read() throws IOException {
+    byte[] buf = new byte[server.getReceiveBufferSize()];
+    DatagramPacket p = new DatagramPacket(buf, buf.length);
+    server.receive(p);
+    debug(p.getData());
+    return p.getData();
+  }
+
+  @Override
+  public void send(URI endpoint, byte[] buf) throws IOException {
+    DatagramSocket socket = new DatagramSocket();
+    socket.setSoTimeout(soTimeout);
+    InetAddress dest = InetAddress.getByName(endpoint.getHost());
+    DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, 
endpoint.getPort());
+    socket.send(payload);
+    // todo: investigate UDP socket reuse. It would save a little 
setup/teardown time wrt to the local socket.
+    socket.close();
+  }
+  
+  private void debug(byte[] jsonBytes) {
+    if (LOGGER.isDebugEnabled()){
+      String receivedMessage = new String(jsonBytes);
+      LOGGER.debug("Received message ( bytes): " + receivedMessage);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java 
b/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java
new file mode 100644
index 0000000..2ae4eb1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.utils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+public class ReflectionUtils {
+
+  /**
+   * Create an instance of a thing. This method essentially makes code more 
readable by handing the various exception
+   * trapping.
+   * @param className
+   * @param constructorTypes
+   * @param constructorArgs
+   * @param <T>
+   * @return constructed instance of a thing.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T constructWithReflection(String className, Class<?>[] 
constructorTypes, Object[] constructorArgs) {
+    try {
+      Constructor<?> c = 
Class.forName(className).getConstructor(constructorTypes);
+      c.setAccessible(true);
+      return (T) c.newInstance(constructorArgs);
+    } catch (InvocationTargetException e) {
+      // catch ITE and throw the target if it is a RTE.
+      if (e.getTargetException() != null && 
RuntimeException.class.isAssignableFrom(e.getTargetException().getClass())) {
+        throw (RuntimeException) e.getTargetException();
+      } else {
+        throw new RuntimeException(e);
+      }
+    } catch (ReflectiveOperationException others) {
+      // Note: No class in the above list should be a descendent of 
RuntimeException. Otherwise, we're just wrapping
+      //       and making stack traces confusing.
+      throw new RuntimeException(others);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java 
b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index b19f221..4c6014a 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -18,15 +18,14 @@
 package org.apache.gossip.crdt;
 
 import java.io.IOException;
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.GossipSettings;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.protocol.JacksonProtocolManager;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -88,16 +87,11 @@ public class OrSetTest {
   
   @Test
   public void serialTest() throws InterruptedException, URISyntaxException, 
IOException {
-    GossipManager gossipService2 = GossipManagerBuilder.newBuilder()
-            .cluster("a")
-            .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
-            .id("1")
-            .gossipSettings(new GossipSettings())
-            .build();
+    ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new 
GossipSettings());
     OrSet<Integer> i = new OrSet<Integer>(new 
OrSet.Builder<Integer>().add(1).remove(1));
-    String s = gossipService2.getObjectMapper().writeValueAsString(i);
+    String s = objectMapper.writeValueAsString(i);
     @SuppressWarnings("unchecked")
-    OrSet<Integer> back = gossipService2.getObjectMapper().readValue(s, 
OrSet.class);
+    OrSet<Integer> back = objectMapper.readValue(s, OrSet.class);
     Assert.assertEquals(back, i);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
 
b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
index 959f818..bc0b46a 100644
--- 
a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
+++ 
b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.manager.handlers.ResponseHandler;
 import org.apache.gossip.manager.handlers.TypedMessageHandler;
 import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
@@ -43,6 +44,17 @@ import static org.junit.jupiter.api.Assertions.expectThrows;
 @RunWith(JUnitPlatform.class)
 public class GossipManagerBuilderTest {
 
+  private GossipManagerBuilder.ManagerBuilder builder;
+  
+  @BeforeEach
+  public void setup() throws Exception {
+    builder = GossipManagerBuilder.newBuilder()
+        .id("id")
+        .cluster("aCluster")
+        .uri(new URI("udp://localhost:2000"))
+        .gossipSettings(new GossipSettings());
+  }
+  
   @Test
   public void idShouldNotBeNull() {
     expectThrows(IllegalArgumentException.class,() -> {
@@ -66,35 +78,20 @@ public class GossipManagerBuilderTest {
   
   @Test
   public void createMembersListIfNull() throws URISyntaxException {
-    GossipManager gossipManager = GossipManagerBuilder.newBuilder()
-        .id("id")
-        .cluster("aCluster")
-        .uri(new URI("udp://localhost:2000"))
-        .gossipSettings(new GossipSettings())
-        .gossipMembers(null).registry(new MetricRegistry()).build();
+    GossipManager gossipManager = builder.gossipMembers(null).registry(new 
MetricRegistry()).build();
     assertNotNull(gossipManager.getLiveMembers());
   }
 
   @Test
   public void createDefaultMessageHandlerIfNull() throws URISyntaxException {
-    GossipManager gossipManager = GossipManagerBuilder.newBuilder()
-        .id("id")
-        .cluster("aCluster")
-        .uri(new URI("udp://localhost:2000"))
-        .gossipSettings(new GossipSettings())
-        .messageHandler(null).registry(new MetricRegistry()).build();
+    GossipManager gossipManager = builder.messageHandler(null).registry(new 
MetricRegistry()).build();
     assertNotNull(gossipManager.getMessageHandler());
   }
 
   @Test
   public void testMessageHandlerKeeping() throws URISyntaxException {
     MessageHandler mi = new TypedMessageHandler(Response.class, new 
ResponseHandler());
-    GossipManager gossipManager = GossipManagerBuilder.newBuilder()
-        .id("id")
-        .cluster("aCluster")
-        .uri(new URI("udp://localhost:2000"))
-        .gossipSettings(new GossipSettings())
-        .messageHandler(mi).registry(new MetricRegistry()).build();
+    GossipManager gossipManager = builder.messageHandler(mi).registry(new 
MetricRegistry()).build();
     assertNotNull(gossipManager.getMessageHandler());
     Assert.assertEquals(gossipManager.getMessageHandler(), mi);
   }
@@ -106,10 +103,7 @@ public class GossipManagerBuilderTest {
             System.nanoTime(), new HashMap<String, String>(), 1000, 1, 
"exponential");
     List<Member> memberList = new ArrayList<>();
     memberList.add(member);
-    GossipManager gossipManager = GossipManagerBuilder.newBuilder()
-        .id("id")
-        .cluster("aCluster")
-        .gossipSettings(new GossipSettings())
+    GossipManager gossipManager = builder
         .uri(new URI("udp://localhost:8000"))
         .gossipMembers(memberList).registry(new MetricRegistry()).build();
     assertEquals(1, gossipManager.getDeadMembers().size());

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java 
b/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
index d448b98..ebe0e2c 100644
--- 
a/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
+++ 
b/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
@@ -49,7 +49,7 @@ public class RingPersistenceTest {
                             new RemoteMember("a", new URI("udp://" + 
"127.0.0.1" + ":" + (29000 + 0)), "0"),
                             new RemoteMember("a", new URI("udp://" + 
"127.0.0.1" + ":" + (29000 + 2)), "2"))).build();
     gossipService.getRingState().writeToDisk();
-    return gossipService.getRingState().computeTarget();
+    return GossipManager.buildRingStatePath(gossipService);
   }
   
   private void aNewInstanceGetsRingInfo(GossipSettings settings) throws 
UnknownHostException, InterruptedException, URISyntaxException {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
 
b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
index 7b17e41..dde4b74 100644
--- 
a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
+++ 
b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
@@ -68,8 +68,8 @@ public class UserDataPersistenceTest {
       gossipService.init();
       Assert.assertEquals("red", ((AToothpick) 
gossipService.findPerNodeGossipData(nodeId, "a").getPayload()).getColor());
       Assert.assertEquals("blue", ((AToothpick) 
gossipService.findSharedGossipData("a").getPayload()).getColor());
-      File f = gossipService.getUserDataState().computeSharedTarget();
-      File g = gossipService.getUserDataState().computePerNodeTarget();
+      File f = GossipManager.buildSharedDataPath(gossipService);
+      File g = GossipManager.buildPerNodeDataPath(gossipService);
       gossipService.shutdown();
       f.delete();
       g.delete();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
 
b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
index c035d21..ec91d67 100644
--- 
a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
+++ 
b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
@@ -64,7 +64,7 @@ public class MessageHandlerTest {
       return true;
     }
   }
-
+      
   @Test
   public void testSimpleHandler() {
     MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new 
FakeMessageHandler());

Reply via email to