http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
new file mode 100644
index 0000000..b73550e
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.Map.Entry;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.Member;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpPerNodeDataMessage;
+import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.apache.log4j.Logger;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * The ActiveGossipThread is sends information. Pick a random partner and send 
the membership list to that partner
+ */
+public abstract class AbstractActiveGossiper {
+
+  protected static final Logger LOGGER = 
Logger.getLogger(AbstractActiveGossiper.class);
+  
+  protected final GossipManager gossipManager;
+  protected final GossipCore gossipCore;
+  private final Histogram sharedDataHistogram;
+  private final Histogram sendPerNodeDataHistogram;
+  private final Histogram sendMembershipHistorgram;
+  private final Random random;
+
+  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore 
gossipCore, MetricRegistry registry) {
+    this.gossipManager = gossipManager;
+    this.gossipCore = gossipCore;
+    sharedDataHistogram = 
registry.histogram(name(AbstractActiveGossiper.class, 
"sharedDataHistogram-time"));
+    sendPerNodeDataHistogram = 
registry.histogram(name(AbstractActiveGossiper.class, 
"sendPerNodeDataHistogram-time"));
+    sendMembershipHistorgram = 
registry.histogram(name(AbstractActiveGossiper.class, 
"sendMembershipHistorgram-time"));
+    random = new Random();
+  }
+
+  public void init() {
+
+  }
+  
+  public void shutdown() {
+
+  }
+
+  public final void sendShutdownMessage(LocalMember me, LocalMember target){
+    if (target == null){
+      return;
+    }
+    ShutdownMessage m = new ShutdownMessage();
+    m.setNodeId(me.getId());
+    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
+    gossipCore.sendOneWay(m, target.getUri());
+  }
+  
+  public final void sendSharedData(LocalMember me, LocalMember member){
+    if (member == null){
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (Entry<String, SharedDataMessage> innerEntry : 
gossipCore.getSharedData().entrySet()){
+      UdpSharedDataMessage message = new UdpSharedDataMessage();
+      message.setUuid(UUID.randomUUID().toString());
+      message.setUriFrom(me.getId());
+      message.setExpireAt(innerEntry.getValue().getExpireAt());
+      message.setKey(innerEntry.getValue().getKey());
+      message.setNodeId(innerEntry.getValue().getNodeId());
+      message.setTimestamp(innerEntry.getValue().getTimestamp());
+      message.setPayload(innerEntry.getValue().getPayload());
+      gossipCore.sendOneWay(message, member.getUri());
+    }
+    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
+  }
+  
+  public final void sendPerNodeData(LocalMember me, LocalMember member){
+    if (member == null){
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : 
gossipCore.getPerNodeData().entrySet()){
+      for (Entry<String, PerNodeDataMessage> innerEntry : 
entry.getValue().entrySet()){
+        UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
+        message.setUuid(UUID.randomUUID().toString());
+        message.setUriFrom(me.getId());
+        message.setExpireAt(innerEntry.getValue().getExpireAt());
+        message.setKey(innerEntry.getValue().getKey());
+        message.setNodeId(innerEntry.getValue().getNodeId());
+        message.setTimestamp(innerEntry.getValue().getTimestamp());
+        message.setPayload(innerEntry.getValue().getPayload());
+        gossipCore.sendOneWay(message, member.getUri());   
+      }
+    }
+    sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+  }
+    
+  /**
+   * Performs the sending of the membership list, after we have incremented 
our own heartbeat.
+   */
+  protected void sendMembershipList(LocalMember me, LocalMember member) {
+    if (member == null){
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    me.setHeartbeat(System.nanoTime());
+    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+    message.setUuid(UUID.randomUUID().toString());
+    message.getMembers().add(convert(me));
+    for (LocalMember other : gossipManager.getMembers().keySet()) {
+      message.getMembers().add(convert(other));
+    }
+    Response r = gossipCore.send(message, member.getUri());
+    if (r instanceof ActiveGossipOk){
+      //maybe count metrics here
+    } else {
+      LOGGER.debug("Message " + message + " generated response " + r);
+    }
+    sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
+  }
+    
+  protected final Member convert(LocalMember member){
+    Member gm = new Member();
+    gm.setCluster(member.getClusterName());
+    gm.setHeartbeat(member.getHeartbeat());
+    gm.setUri(member.getUri().toASCIIString());
+    gm.setId(member.getId());
+    gm.setProperties(member.getProperties());
+    return gm;
+  }
+  
+  /**
+   * 
+   * @param memberList
+   *          An immutable list
+   * @return The chosen LocalGossipMember to gossip with.
+   */
+  protected LocalMember selectPartner(List<LocalMember> memberList) {
+    LocalMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    }
+    return member;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java
new file mode 100644
index 0000000..6629c62
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface Clock {
+
+  long currentTimeMillis();
+  long nanoTime();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java
new file mode 100644
index 0000000..8175a1b
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
+
+/**
+ * We wish to periodically sweep user data and remove entries past their 
timestamp. This
+ * implementation periodically sweeps through the data and removes old 
entries. While it might make
+ * sense to use a more specific high performance data-structure to handle 
eviction, keep in mind
+ * that we are not looking to store a large quantity of data as we currently 
have to transmit this
+ * data cluster wide.
+ */
+public class DataReaper {
+
+  private final GossipCore gossipCore;
+  private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);
+  private final Clock clock;
+  
+  public DataReaper(GossipCore gossipCore, Clock clock){
+    this.gossipCore = gossipCore;
+    this.clock = clock;
+  }
+  
+  public void init(){
+    Runnable reapPerNodeData = () -> {
+      runPerNodeOnce();
+      runSharedOnce();
+    };
+    scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, 
TimeUnit.SECONDS);
+  }
+  
+  void runSharedOnce(){
+    for (Entry<String, SharedDataMessage> entry : 
gossipCore.getSharedData().entrySet()){
+      if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
+        gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+  
+  void runPerNodeOnce(){
+    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : 
gossipCore.getPerNodeData().entrySet()){
+      reapData(node.getValue());
+    }
+  }
+  
+  void reapData(ConcurrentHashMap<String, PerNodeDataMessage> 
concurrentHashMap){
+    for (Entry<String, PerNodeDataMessage> entry : 
concurrentHashMap.entrySet()){
+      if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
+        concurrentHashMap.remove(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+  
+  public void close(){
+    scheduledExecutor.shutdown();
+    try {
+      scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
new file mode 100644
index 0000000..2f489a2
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Sends gossip traffic at different rates to other racks and data-centers.
+ * This implementation controls the rate at which gossip traffic is shared. 
+ * There are two constructs Datacenter and Rack. It is assumed that bandwidth 
and latency is higher
+ * in the rack than in the the datacenter. We can adjust the rate at which we 
send messages to each group.
+ * 
+ */
+public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
+
+  public static final String DATACENTER = "datacenter";
+  public static final String RACK = "rack";
+  
+  private int sameRackGossipIntervalMs = 100;
+  private int sameDcGossipIntervalMs = 500;
+  private int differentDatacenterGossipIntervalMs = 1000;
+  private int randomDeadMemberSendIntervalMs = 250;
+  
+  private ScheduledExecutorService scheduledExecutorService;
+  private final BlockingQueue<Runnable> workQueue;
+  private ThreadPoolExecutor threadService;
+  
+  public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, 
GossipCore gossipCore,
+          MetricRegistry registry) {
+    super(gossipManager, gossipCore, registry);
+    scheduledExecutorService = Executors.newScheduledThreadPool(2);
+    workQueue = new ArrayBlockingQueue<Runnable>(1024);
+    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, 
workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+    try {
+      sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+              .getActiveGossipProperties().get("sameRackGossipIntervalMs"));
+    } catch (RuntimeException ex) { }
+    try {
+      sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+              .getActiveGossipProperties().get("sameDcGossipIntervalMs"));
+    } catch (RuntimeException ex) { }
+    try {
+      differentDatacenterGossipIntervalMs = 
Integer.parseInt(gossipManager.getSettings()
+              
.getActiveGossipProperties().get("differentDatacenterGossipIntervalMs"));
+    } catch (RuntimeException ex) { }
+    try {
+      randomDeadMemberSendIntervalMs = 
Integer.parseInt(gossipManager.getSettings()
+              
.getActiveGossipProperties().get("randomDeadMemberSendIntervalMs"));
+    } catch (RuntimeException ex) { }
+  }
+
+  @Override
+  public void init() {
+    super.init();
+    //same rack
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToSameRackMember()), 
+      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToSameRackMemberPerNode()), 
+      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToSameRackShared()), 
+      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    //same dc different rack
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sameDcDiffernetRackMember()), 
+      0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> sameDcDiffernetRackPerNode()), 
+    0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> sameDcDiffernetRackShared()), 
+    0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    //different dc
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> differentDcMember()), 
+      0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> differentDcPerNode()), 
+    0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+  
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> differentDcShared()), 
+    0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    //the dead
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToDeadMember()), 
+      0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS);
+    
+  }
+
+  private void sendToDeadMember() {
+    sendMembershipList(gossipManager.getMyself(), 
selectPartner(gossipManager.getDeadMembers()));
+  }
+  
+  private List<LocalMember> differentDataCenter(){
+    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+    String rack = gossipManager.getMyself().getProperties().get(RACK);
+    if (myDc == null|| rack == null){
+      return Collections.emptyList();
+    }
+    List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
+    for (LocalMember i : gossipManager.getLiveMembers()){
+      if (!myDc.equals(i.getProperties().get(DATACENTER))){
+        notMyDc.add(i);
+      }
+    }
+    return notMyDc;
+  }
+  
+  private List<LocalMember> sameDatacenterDifferentRack(){
+    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+    String rack = gossipManager.getMyself().getProperties().get(RACK);
+    if (myDc == null|| rack == null){
+      return Collections.emptyList();
+    }
+    List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
+    for (LocalMember i : gossipManager.getLiveMembers()){
+      if (myDc.equals(i.getProperties().get(DATACENTER)) && 
!rack.equals(i.getProperties().get(RACK))){
+        notMyDc.add(i);
+      }
+    }
+    return notMyDc;
+  }
+    
+  private List<LocalMember> sameRackNodes(){
+    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+    String rack = gossipManager.getMyself().getProperties().get(RACK);
+    if (myDc == null|| rack == null){
+      return Collections.emptyList();
+    }
+    List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10);
+    for (LocalMember i : gossipManager.getLiveMembers()){
+      if (myDc.equals(i.getProperties().get(DATACENTER))
+              && rack.equals(i.getProperties().get(RACK))){
+        sameDcAndRack.add(i);
+      }
+    }
+    return sameDcAndRack;
+  }
+
+  private void sendToSameRackMember() {
+    LocalMember i = selectPartner(sameRackNodes());
+    sendMembershipList(gossipManager.getMyself(), i);
+  }
+  
+  private void sendToSameRackMemberPerNode() {
+    sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+  }
+  
+  private void sendToSameRackShared() {
+    sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+  }
+  
+  private void differentDcMember() {
+    sendMembershipList(gossipManager.getMyself(), 
selectPartner(differentDataCenter()));
+  }
+  
+  private void differentDcPerNode() {
+    sendPerNodeData(gossipManager.getMyself(), 
selectPartner(differentDataCenter()));
+  }
+  
+  private void differentDcShared() {
+    sendSharedData(gossipManager.getMyself(), 
selectPartner(differentDataCenter()));
+  }
+  
+  private void sameDcDiffernetRackMember() {
+    sendMembershipList(gossipManager.getMyself(), 
selectPartner(sameDatacenterDifferentRack()));
+  }
+  
+  private void sameDcDiffernetRackPerNode() {
+    sendPerNodeData(gossipManager.getMyself(), 
selectPartner(sameDatacenterDifferentRack()));
+  }
+  
+  private void sameDcDiffernetRackShared() {
+    sendSharedData(gossipManager.getMyself(), 
selectPartner(sameDatacenterDifferentRack()));
+  }
+  
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    scheduledExecutorService.shutdown();
+    try {
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    sendShutdownMessage();
+    threadService.shutdown();
+    try {
+      threadService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+  }
+  
+  /**
+   * sends an optimistic shutdown message to several clusters nodes
+   */
+  protected void sendShutdownMessage(){
+    List<LocalMember> l = gossipManager.getLiveMembers();
+    int sendTo = l.size() < 3 ? 1 : l.size() / 3;
+    for (int i = 0; i < sendTo; i++) {
+      threadService.execute(() -> 
sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
new file mode 100644
index 0000000..f53419d
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.Member;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.crdt.Crdt;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.*;
+import org.apache.gossip.udp.Trackable;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.URI;
+import java.security.*;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+
+public class GossipCore implements GossipCoreConstants {
+
+  class LatchAndBase {
+    private final CountDownLatch latch;
+    private volatile Base base;
+    
+    LatchAndBase(){
+      latch = new CountDownLatch(1);
+    }
+    
+  }
+  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
+  private final GossipManager gossipManager;
+  private ConcurrentHashMap<String, LatchAndBase> requests;
+  private ThreadPoolExecutor service;
+  private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>> perNodeData;
+  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
+  private final BlockingQueue<Runnable> workQueue;
+  private final PKCS8EncodedKeySpec privKeySpec;
+  private final PrivateKey privKey;
+  private final Meter messageSerdeException;
+  private final Meter tranmissionException;
+  private final Meter tranmissionSuccess;
+
+  public GossipCore(GossipManager manager, MetricRegistry metrics){
+    this.gossipManager = manager;
+    requests = new ConcurrentHashMap<>();
+    workQueue = new ArrayBlockingQueue<>(1024);
+    service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new 
ThreadPoolExecutor.DiscardOldestPolicy());
+    perNodeData = new ConcurrentHashMap<>();
+    sharedData = new ConcurrentHashMap<>();
+    metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
+    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> 
perNodeData.size());
+    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  
sharedData.size());
+    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
+    metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() ->  
service.getActiveCount());
+    metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() ->  
service.getPoolSize());
+    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
+    tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
+    tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
+
+    if (manager.getSettings().isSignMessages()){
+      File privateKey = new File(manager.getSettings().getPathToKeyStore(), 
manager.getMyself().getId());
+      File publicKey = new File(manager.getSettings().getPathToKeyStore(), 
manager.getMyself().getId() + ".pub");
+      if (!privateKey.exists()){
+        throw new IllegalArgumentException("private key not found " + 
privateKey);
+      }
+      if (!publicKey.exists()){
+        throw new IllegalArgumentException("public key not found " + 
publicKey);
+      }
+      try (FileInputStream keyfis = new FileInputStream(privateKey)) {
+        byte[] encKey = new byte[keyfis.available()];
+        keyfis.read(encKey);
+        keyfis.close();
+        privKeySpec = new PKCS8EncodedKeySpec(encKey);
+        KeyFactory keyFactory = KeyFactory.getInstance("DSA");
+        privKey = keyFactory.generatePrivate(privKeySpec);
+      } catch (NoSuchAlgorithmException | InvalidKeySpecException | 
IOException e) {
+        throw new RuntimeException("failed hard", e);
+      }
+    } else {
+      privKeySpec = null;
+      privKey = null;
+    }
+  }
+
+  private byte [] sign(byte [] bytes){
+    Signature dsa;
+    try {
+      dsa = Signature.getInstance("SHA1withDSA", "SUN");
+      dsa.initSign(privKey);
+      dsa.update(bytes);
+      return dsa.sign();
+    } catch (NoSuchAlgorithmException | NoSuchProviderException | 
InvalidKeyException | SignatureException e) {
+      throw new RuntimeException(e);
+    } 
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void addSharedData(SharedDataMessage message) {
+    while (true){
+      SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), 
message);
+      if (previous == null){
+        return;
+      }
+      if (message.getPayload() instanceof Crdt){
+        SharedDataMessage merged = new SharedDataMessage();
+        merged.setExpireAt(message.getExpireAt());
+        merged.setKey(message.getKey());
+        merged.setNodeId(message.getNodeId());
+        merged.setTimestamp(message.getTimestamp());
+        Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) 
message.getPayload());
+        merged.setPayload(mergedCrdt);
+        boolean replaced = sharedData.replace(message.getKey(), previous, 
merged);
+        if (replaced){
+          return;
+        }
+      } else {
+        if (previous.getTimestamp() < message.getTimestamp()){
+          boolean result = sharedData.replace(message.getKey(), previous, 
message);
+          if (result){
+            return;
+          }
+        } else {
+          return;
+        }
+      }
+    }
+  }
+  
+  public void addPerNodeData(PerNodeDataMessage message){
+    ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new 
ConcurrentHashMap<>();
+    nodeMap.put(message.getKey(), message);
+    nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
+    if (nodeMap != null){
+      PerNodeDataMessage current = nodeMap.get(message.getKey());
+      if (current == null){
+        nodeMap.putIfAbsent(message.getKey(), message);
+      } else {
+        if (current.getTimestamp() < message.getTimestamp()){
+          nodeMap.replace(message.getKey(), current, message);
+        }
+      }
+    }
+  }
+
+  public ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>> getPerNodeData(){
+    return perNodeData;
+  }
+
+  public ConcurrentHashMap<String, SharedDataMessage> getSharedData() {
+    return sharedData;
+  }
+
+  public void shutdown(){
+    service.shutdown();
+    try {
+      service.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn(e);
+    }
+    service.shutdownNow();
+  }
+
+  public void receive(Base base) {
+    if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) {
+      LOGGER.warn("received message can not be handled");
+    }
+  }
+
+  /**
+   * Sends a blocking message.
+   * @param message
+   * @param uri
+   * @throws RuntimeException if data can not be serialized or in transmission 
error
+   */
+  private void sendInternal(Base message, URI uri){
+    byte[] json_bytes;
+    try {
+      if (privKey == null){
+        json_bytes = 
gossipManager.getObjectMapper().writeValueAsBytes(message);
+      } else {
+        SignedPayload p = new SignedPayload();
+        
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
+        p.setSignature(sign(p.getData()));
+        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
+      }
+    } catch (IOException e) {
+      messageSerdeException.mark();
+      throw new RuntimeException(e);
+    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
+      InetAddress dest = InetAddress.getByName(uri.getHost());
+      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, 
json_bytes.length, dest, uri.getPort());
+      socket.send(datagramPacket);
+      tranmissionSuccess.mark();
+    } catch (IOException e) {
+      tranmissionException.mark();
+      throw new RuntimeException(e);
+    }
+  }
+
+  public Response send(Base message, URI uri){
+    if (LOGGER.isDebugEnabled()){
+      LOGGER.debug("Sending " + message);
+      LOGGER.debug("Current request queue " + requests);
+    }
+
+    final Trackable t;
+    LatchAndBase latchAndBase = null;
+    if (message instanceof Trackable){
+      t = (Trackable) message;
+      latchAndBase = new LatchAndBase();
+      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
+    } else {
+      t = null;
+    }
+    sendInternal(message, uri);
+    if (latchAndBase == null){
+      return null;
+    } 
+    
+    try {
+      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
+      if (complete){
+        return (Response) latchAndBase.base;
+      } else{
+        return null;
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (latchAndBase != null){
+        requests.remove(t.getUuid() + "/" + t.getUriFrom());
+      }
+    }
+  }
+
+  /**
+   * Sends a message across the network while blocking. Catches and ignores 
IOException in transmission. Used
+   * when the protocol for the message is not to wait for a response
+   * @param message the message to send
+   * @param u the uri to send it to
+   */
+  public void sendOneWay(Base message, URI u){
+    byte[] json_bytes;
+    try {
+      if (privKey == null){
+        json_bytes = 
gossipManager.getObjectMapper().writeValueAsBytes(message);
+      } else {
+        SignedPayload p = new SignedPayload();
+        
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
+        p.setSignature(sign(p.getData()));
+        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
+      }
+    } catch (IOException e) {
+      messageSerdeException.mark();
+      throw new RuntimeException(e);
+    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
+      InetAddress dest = InetAddress.getByName(u.getHost());
+      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, 
json_bytes.length, dest, u.getPort());
+      socket.send(datagramPacket);
+      tranmissionSuccess.mark();
+    } catch (IOException ex) {
+      tranmissionException.mark();
+      LOGGER.debug("Send one way failed", ex);
+    }
+  }
+
+  public void handleResponse(String k, Base v) {
+    LatchAndBase latch = requests.get(k);
+    latch.base = v;
+    latch.latch.countDown();
+  }
+
+  /**
+   * Merge lists from remote members and update heartbeats
+   *
+   * @param gossipManager
+   * @param senderMember
+   * @param remoteList
+   *
+   */
+  public void mergeLists(GossipManager gossipManager, RemoteMember 
senderMember,
+          List<Member> remoteList) {
+    if (LOGGER.isDebugEnabled()){
+      debugState(senderMember, remoteList);
+    }
+    for (LocalMember i : gossipManager.getDeadMembers()) {
+      if (i.getId().equals(senderMember.getId())) {
+        LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " 
+ senderMember.getUri());
+        i.recordHeartbeat(senderMember.getHeartbeat());
+        i.setHeartbeat(senderMember.getHeartbeat());
+        //TODO consider forcing an UP here
+      }
+    }
+    for (Member remoteMember : remoteList) {
+      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
+        continue;
+      }
+      LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
+      remoteMember.getUri(),
+      remoteMember.getId(),
+      remoteMember.getHeartbeat(),
+      remoteMember.getProperties(),
+      gossipManager.getSettings().getWindowSize(),
+      gossipManager.getSettings().getMinimumSamples(),
+      gossipManager.getSettings().getDistribution());
+      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
+      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, 
GossipState.UP);
+      if (result != null){
+        for (Entry<LocalMember, GossipState> localMember : 
gossipManager.getMembers().entrySet()){
+          if (localMember.getKey().getId().equals(remoteMember.getId())){
+            localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
+            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
+            localMember.getKey().setProperties(remoteMember.getProperties());
+          }
+        }
+      }
+    }
+    if (LOGGER.isDebugEnabled()){
+      debugState(senderMember, remoteList);
+    }
+  }
+
+  private void debugState(RemoteMember senderMember,
+          List<Member> remoteList){
+    LOGGER.warn(
+          "-----------------------\n" +
+          "Me " + gossipManager.getMyself() + "\n" +
+          "Sender " + senderMember + "\n" +
+          "RemoteList " + remoteList + "\n" +
+          "Live " + gossipManager.getLiveMembers()+ "\n" +
+          "Dead " + gossipManager.getDeadMembers()+ "\n" +
+          "=======================");
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Crdt merge(SharedDataMessage message) {
+    for (;;){
+      SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), 
message);
+      if (previous == null){
+        return (Crdt) message.getPayload();
+      }
+      SharedDataMessage copy = new SharedDataMessage();
+      copy.setExpireAt(message.getExpireAt());
+      copy.setKey(message.getKey());
+      copy.setNodeId(message.getNodeId());
+      copy.setTimestamp(message.getTimestamp());
+      @SuppressWarnings("unchecked")
+      Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) 
message.getPayload());
+      copy.setPayload(merged);
+      boolean replaced = sharedData.replace(message.getKey(), previous, copy);
+      if (replaced){
+        return merged;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
new file mode 100644
index 0000000..6d3765a
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface GossipCoreConstants {
+  String WORKQUEUE_SIZE = "gossip.core.workqueue.size";
+  String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; 
+  String SHARED_DATA_SIZE = "gossip.core.shareddata.size";
+  String REQUEST_SIZE = "gossip.core.requests.size";
+  String THREADPOOL_ACTIVE = "gossip.core.threadpool.active";
+  String THREADPOOL_SIZE = "gossip.core.threadpool.size";
+  String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception";
+  String MESSAGE_TRANSMISSION_EXCEPTION = 
"gossip.core.message_transmission_exception";
+  String MESSAGE_TRANSMISSION_SUCCESS = 
"gossip.core.message_transmission_success";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
new file mode 100644
index 0000000..c2b50ae
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.Member;
+import org.apache.gossip.crdt.Crdt;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public abstract class GossipManager {
+
+  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
+
+  private final ConcurrentSkipListMap<LocalMember, GossipState> members;
+  private final LocalMember me;
+  private final GossipSettings settings;
+  private final AtomicBoolean gossipServiceRunning;
+  private AbstractActiveGossiper activeGossipThread;
+  private PassiveGossipThread passiveGossipThread;
+  private ExecutorService gossipThreadExecutor;
+  private final GossipCore gossipCore;
+  private final DataReaper dataReaper;
+  private final Clock clock;
+  private final ScheduledExecutorService scheduledServiced;
+  private final MetricRegistry registry;
+  private final RingStatePersister ringState;
+  private final UserDataPersister userDataState;
+  private final GossipMemberStateRefresher memberStateRefresher;
+  private final ObjectMapper objectMapper;
+
+  private final MessageInvoker messageInvoker;
+
+  public GossipManager(String cluster,
+                       URI uri, String id, Map<String, String> properties, 
GossipSettings settings,
+                       List<Member> gossipMembers, GossipListener listener, 
MetricRegistry registry,
+                       ObjectMapper objectMapper, MessageInvoker 
messageInvoker) {
+    this.settings = settings;
+    this.messageInvoker = messageInvoker;
+
+    clock = new SystemClock();
+    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
+            settings.getWindowSize(), settings.getMinimumSamples(), 
settings.getDistribution());
+    gossipCore = new GossipCore(this, registry);
+    dataReaper = new DataReaper(gossipCore, clock);
+    members = new ConcurrentSkipListMap<>();
+    for (Member startupMember : gossipMembers) {
+      if (!startupMember.equals(me)) {
+        LocalMember member = new LocalMember(startupMember.getClusterName(),
+                startupMember.getUri(), startupMember.getId(),
+                clock.nanoTime(), startupMember.getProperties(), 
settings.getWindowSize(),
+                settings.getMinimumSamples(), settings.getDistribution());
+        //TODO should members start in down state?
+        members.put(member, GossipState.DOWN);
+      }
+    }
+    gossipThreadExecutor = Executors.newCachedThreadPool();
+    gossipServiceRunning = new AtomicBoolean(true);
+    this.scheduledServiced = Executors.newScheduledThreadPool(1);
+    this.registry = registry;
+    this.ringState = new RingStatePersister(this);
+    this.userDataState = new UserDataPersister(this, this.gossipCore);
+    this.memberStateRefresher = new GossipMemberStateRefresher(members, 
settings, listener, this::findPerNodeGossipData);
+    this.objectMapper = objectMapper;
+    readSavedRingState();
+    readSavedDataState();
+  }
+
+  public MessageInvoker getMessageInvoker() {
+    return messageInvoker;
+  }
+
+  public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {
+    return members;
+  }
+
+  public GossipSettings getSettings() {
+    return settings;
+  }
+
+  /**
+   * @return a read only list of members found in the DOWN state.
+   */
+  public List<LocalMember> getDeadMembers() {
+    return Collections.unmodifiableList(
+            members.entrySet()
+                    .stream()
+                    .filter(entry -> GossipState.DOWN.equals(entry.getValue()))
+                    .map(Entry::getKey).collect(Collectors.toList()));
+  }
+
+  /**
+   *
+   * @return a read only list of members found in the UP state
+   */
+  public List<LocalMember> getLiveMembers() {
+    return Collections.unmodifiableList(
+            members.entrySet()
+                    .stream()
+                    .filter(entry -> GossipState.UP.equals(entry.getValue()))
+                    .map(Entry::getKey).collect(Collectors.toList()));
+  }
+
+  public LocalMember getMyself() {
+    return me;
+  }
+
+  private AbstractActiveGossiper constructActiveGossiper(){
+    try {
+      Constructor<?> c = 
Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class,
 GossipCore.class, MetricRegistry.class);
+      return (AbstractActiveGossiper) c.newInstance(this, gossipCore, 
registry);
+    } catch (NoSuchMethodException | SecurityException | 
ClassNotFoundException | InstantiationException | IllegalAccessException | 
IllegalArgumentException | InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Starts the client. Specifically, start the various cycles for this 
protocol. Start the gossip
+   * thread and start the receiver thread.
+   */
+  public void init() {
+    passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, 
gossipCore);
+    gossipThreadExecutor.execute(passiveGossipThread);
+    activeGossipThread = constructActiveGossiper();
+    activeGossipThread.init();
+    dataReaper.init();
+    scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
+    scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, 
TimeUnit.SECONDS);
+    scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, 
TimeUnit.MILLISECONDS);
+    LOGGER.debug("The GossipManager is started.");
+  }
+
+  private void readSavedRingState() {
+    for (LocalMember l : ringState.readFromDisk()){
+      LocalMember member = new LocalMember(l.getClusterName(),
+              l.getUri(), l.getId(),
+              clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
+              settings.getMinimumSamples(), settings.getDistribution());
+      members.putIfAbsent(member, GossipState.DOWN);
+    }
+  }
+
+  private void readSavedDataState() {
+    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : 
userDataState.readPerNodeFromDisk().entrySet()){
+      for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
+        gossipCore.addPerNodeData(j.getValue());
+      }
+    }
+    for (Entry<String, SharedDataMessage> l: 
userDataState.readSharedDataFromDisk().entrySet()){
+      gossipCore.addSharedData(l.getValue());
+    }
+  }
+
+  /**
+   * Shutdown the gossip service.
+   */
+  public void shutdown() {
+    gossipServiceRunning.set(false);
+    gossipThreadExecutor.shutdown();
+    gossipCore.shutdown();
+    dataReaper.close();
+    if (passiveGossipThread != null) {
+      passiveGossipThread.shutdown();
+    }
+    if (activeGossipThread != null) {
+      activeGossipThread.shutdown();
+    }
+    try {
+      boolean result = gossipThreadExecutor.awaitTermination(10, 
TimeUnit.MILLISECONDS);
+      if (!result) {
+        LOGGER.error("executor shutdown timed out");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+    gossipThreadExecutor.shutdownNow();
+    scheduledServiced.shutdown();
+    try {
+      scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+    scheduledServiced.shutdownNow();
+  }
+
+  public void gossipPerNodeData(PerNodeDataMessage message){
+    Objects.nonNull(message.getKey());
+    Objects.nonNull(message.getTimestamp());
+    Objects.nonNull(message.getPayload());
+    message.setNodeId(me.getId());
+    gossipCore.addPerNodeData(message);
+  }
+
+  public void gossipSharedData(SharedDataMessage message){
+    Objects.nonNull(message.getKey());
+    Objects.nonNull(message.getTimestamp());
+    Objects.nonNull(message.getPayload());
+    message.setNodeId(me.getId());
+    gossipCore.addSharedData(message);
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  public Crdt findCrdt(String key){
+    SharedDataMessage l = gossipCore.getSharedData().get(key);
+    if (l == null){
+      return null;
+    }
+    if (l.getExpireAt() < clock.currentTimeMillis()){
+      return null;
+    } else {
+      return (Crdt) l.getPayload();
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Crdt merge(SharedDataMessage message){
+    Objects.nonNull(message.getKey());
+    Objects.nonNull(message.getTimestamp());
+    Objects.nonNull(message.getPayload());
+    message.setNodeId(me.getId());
+    if (! (message.getPayload() instanceof Crdt)){
+      throw new IllegalArgumentException("Not a subclass of CRDT " + 
message.getPayload());
+    }
+    return gossipCore.merge(message);
+  }
+
+  public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){
+    ConcurrentHashMap<String, PerNodeDataMessage> j = 
gossipCore.getPerNodeData().get(nodeId);
+    if (j == null){
+      return null;
+    } else {
+      PerNodeDataMessage l = j.get(key);
+      if (l == null){
+        return null;
+      }
+      if (l.getExpireAt() != null && l.getExpireAt() < 
clock.currentTimeMillis()) {
+        return null;
+      }
+      return l;
+    }
+  }
+
+  public SharedDataMessage findSharedGossipData(String key){
+    SharedDataMessage l = gossipCore.getSharedData().get(key);
+    if (l == null){
+      return null;
+    }
+    if (l.getExpireAt() < clock.currentTimeMillis()){
+      return null;
+    } else {
+      return l;
+    }
+  }
+
+  public DataReaper getDataReaper() {
+    return dataReaper;
+  }
+
+  public RingStatePersister getRingState() {
+    return ringState;
+  }
+
+  public UserDataPersister getUserDataState() {
+    return userDataState;
+  }
+
+  public GossipMemberStateRefresher getMemberStateRefresher() {
+    return memberStateRefresher;
+  }
+
+  public Clock getClock() {
+    return clock;
+  }
+
+  public ObjectMapper getObjectMapper() {
+    return objectMapper;
+  }
+
+  public MetricRegistry getRegistry() {
+    return registry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
new file mode 100644
index 0000000..b87045b
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonGenerator.Feature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.gossip.Member;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.StartupSettings;
+import org.apache.gossip.crdt.CrdtModule;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
+import org.apache.gossip.manager.handlers.MessageInvoker;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GossipManagerBuilder {
+
+  public static ManagerBuilder newBuilder() {
+    return new ManagerBuilder();
+  }
+
+  public static final class ManagerBuilder {
+    private String cluster;
+    private URI uri;
+    private String id;
+    private GossipSettings settings;
+    private List<Member> gossipMembers;
+    private GossipListener listener;
+    private MetricRegistry registry;
+    private Map<String,String> properties;
+    private ObjectMapper objectMapper;
+    private MessageInvoker messageInvoker;
+
+    private ManagerBuilder() {}
+
+    private void checkArgument(boolean check, String msg) {
+      if (!check) {
+        throw new IllegalArgumentException(msg);
+      }
+    }
+
+    public ManagerBuilder cluster(String cluster) {
+      this.cluster = cluster;
+      return this;
+    }
+    
+    public ManagerBuilder properties(Map<String,String> properties) {
+      this.properties = properties;
+      return this;
+    }
+
+    public ManagerBuilder id(String id) {
+      this.id = id;
+      return this;
+    }
+
+    public ManagerBuilder gossipSettings(GossipSettings settings) {
+      this.settings = settings;
+      return this;
+    }
+    
+    public ManagerBuilder startupSettings(StartupSettings startupSettings) {
+      this.cluster = startupSettings.getCluster();
+      this.id = startupSettings.getId();
+      this.settings = startupSettings.getGossipSettings();
+      this.gossipMembers = startupSettings.getGossipMembers();
+      this.uri = startupSettings.getUri();
+      return this;
+    }
+
+    public ManagerBuilder gossipMembers(List<Member> members) {
+      this.gossipMembers = members;
+      return this;
+    }
+
+    public ManagerBuilder listener(GossipListener listener) {
+      this.listener = listener;
+      return this;
+    }
+    
+    public ManagerBuilder registry(MetricRegistry registry) {
+      this.registry = registry;
+      return this;
+    }
+
+    public ManagerBuilder uri(URI uri){
+      this.uri = uri;
+      return this;
+    }
+    
+    public ManagerBuilder mapper(ObjectMapper objectMapper){
+      this.objectMapper = objectMapper;
+      return this;
+    }
+
+    public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) {
+      this.messageInvoker = messageInvoker;
+      return this;
+    }
+
+    public GossipManager build() {
+      checkArgument(id != null, "You must specify an id");
+      checkArgument(cluster != null, "You must specify a cluster name");
+      checkArgument(settings != null, "You must specify gossip settings");
+      checkArgument(uri != null, "You must specify a uri");
+      if (registry == null){
+        registry = new MetricRegistry();
+      }
+      if (properties == null){
+        properties = new HashMap<String,String>();
+      }
+      if (listener == null){
+        listener((a,b) -> {});
+      }
+      if (gossipMembers == null) {
+        gossipMembers = new ArrayList<>();
+      }
+      if (objectMapper == null) {
+        objectMapper = new ObjectMapper();
+        objectMapper.enableDefaultTyping();
+        objectMapper.registerModule(new CrdtModule());
+        objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
+      }
+      if (messageInvoker == null) {
+        messageInvoker = new DefaultMessageInvoker();
+      } 
+      return new GossipManager(cluster, uri, id, properties, settings, 
gossipMembers, listener, registry, objectMapper, messageInvoker) {} ;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
new file mode 100644
index 0000000..1836309
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+public class GossipMemberStateRefresher implements Runnable {
+  public static final Logger LOGGER = 
Logger.getLogger(GossipMemberStateRefresher.class);
+
+  private final Map<LocalMember, GossipState> members;
+  private final GossipSettings settings;
+  private final GossipListener listener;
+  private final Clock clock;
+  private final BiFunction<String, String, PerNodeDataMessage> 
findPerNodeGossipData;
+
+  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, 
GossipSettings settings,
+                                    GossipListener listener, 
BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
+    this.members = members;
+    this.settings = settings;
+    this.listener = listener;
+    this.findPerNodeGossipData = findPerNodeGossipData;
+    clock = new SystemClock();
+  }
+
+  public void run() {
+    try {
+      runOnce();
+    } catch (RuntimeException ex) {
+      LOGGER.warn("scheduled state had exception", ex);
+    }
+  }
+
+  public void runOnce() {
+    for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
+      boolean userDown = processOptimisticShutdown(entry);
+      if (userDown)
+        continue;
+
+      Double phiMeasure = entry.getKey().detect(clock.nanoTime());
+      GossipState requiredState;
+
+      if (phiMeasure != null) {
+        requiredState = calcRequiredState(phiMeasure);
+      } else {
+        requiredState = calcRequiredStateCleanupInterval(entry.getKey(), 
entry.getValue());
+      }
+
+      if (entry.getValue() != requiredState) {
+        members.put(entry.getKey(), requiredState);
+        listener.gossipEvent(entry.getKey(), requiredState);
+      }
+    }
+  }
+
+  public GossipState calcRequiredState(Double phiMeasure) {
+    if (phiMeasure > settings.getConvictThreshold())
+      return GossipState.DOWN;
+    else
+      return GossipState.UP;
+  }
+
+  public GossipState calcRequiredStateCleanupInterval(LocalMember member, 
GossipState state) {
+    long now = clock.nanoTime();
+    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, 
TimeUnit.NANOSECONDS);
+    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
+      return GossipState.DOWN;
+    } else {
+      return state;
+    }
+  }
+
+  /**
+   * If we have a special key the per-node data that means that the node has 
sent us
+   * a pre-emptive shutdown message. We process this so node is seen down 
sooner
+   *
+   * @param l member to consider
+   * @return true if node forced down
+   */
+  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
+    PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), 
ShutdownMessage.PER_NODE_KEY);
+    if (m == null) {
+      return false;
+    }
+    ShutdownMessage s = (ShutdownMessage) m.getPayload();
+    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
+      members.put(l.getKey(), GossipState.DOWN);
+      if (l.getValue() == GossipState.UP) {
+        listener.gossipEvent(l.getKey(), GossipState.DOWN);
+      }
+      return true;
+    }
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
new file mode 100644
index 0000000..3bcc344
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface PassiveGossipConstants {
+  String SIGNED_MESSAGE = "gossip.passive.signed_message";
+  String UNSIGNED_MESSAGE = "gossip.passive.unsigned_message";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
new file mode 100644
index 0000000..ae28bf7
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SignedPayload;
+import org.apache.log4j.Logger;
+
+import com.codahale.metrics.Meter;
+
+/**
+ * This class handles the passive cycle,
+ * where this client has received an incoming message. 
+ */
+abstract public class PassiveGossipThread implements Runnable {
+
+  public static final Logger LOGGER = 
Logger.getLogger(PassiveGossipThread.class);
+
+  /** The socket used for the passive thread of the gossip service. */
+  private final DatagramSocket server;
+  private final AtomicBoolean keepRunning;
+  private final GossipCore gossipCore;
+  private final GossipManager gossipManager;
+  private final Meter signed;
+  private final Meter unsigned;
+
+  public PassiveGossipThread(GossipManager gossipManager, GossipCore 
gossipCore) {
+    this.gossipManager = gossipManager;
+    this.gossipCore = gossipCore;
+    if (gossipManager.getMyself().getClusterName() == null){
+      throw new IllegalArgumentException("Cluster was null");
+    }
+    try {
+      SocketAddress socketAddress = new 
InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
+              gossipManager.getMyself().getUri().getPort());
+      server = new DatagramSocket(socketAddress);
+    } catch (SocketException ex) {
+      LOGGER.warn(ex);
+      throw new RuntimeException(ex);
+    }
+    keepRunning = new AtomicBoolean(true);
+    signed = 
gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
+    unsigned = 
gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
+  }
+
+  @Override
+  public void run() {
+    while (keepRunning.get()) {
+      try {
+        byte[] buf = new byte[server.getReceiveBufferSize()];
+        DatagramPacket p = new DatagramPacket(buf, buf.length);
+        server.receive(p);
+        debug(p.getData());
+        try {
+          Base activeGossipMessage = 
gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
+          if (activeGossipMessage instanceof SignedPayload){
+            SignedPayload s = (SignedPayload) activeGossipMessage;
+            Base nested = 
gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
+            gossipCore.receive(nested);
+            signed.mark();
+          } else {
+            gossipCore.receive(activeGossipMessage);
+            unsigned.mark();
+          }
+          gossipManager.getMemberStateRefresher().run();
+        } catch (RuntimeException ex) {//TODO trap json exception
+          LOGGER.error("Unable to process message", ex);
+        }
+      } catch (IOException e) {
+        LOGGER.error(e);
+        keepRunning.set(false);
+      }
+    }
+    shutdown();
+  }
+
+  private void debug(byte[] jsonBytes) {
+    if (LOGGER.isDebugEnabled()){
+      String receivedMessage = new String(jsonBytes);
+      LOGGER.debug("Received message ( bytes): " + receivedMessage);
+    }
+  }
+
+  public void shutdown() {
+    try {
+      server.close();
+    } catch (RuntimeException ex) {
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
new file mode 100644
index 0000000..7e42562
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */ 
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.gossip.LocalMember;
+import org.apache.log4j.Logger;
+
+public class RingStatePersister implements Runnable {
+
+  private static final Logger LOGGER = 
Logger.getLogger(RingStatePersister.class);
+  private GossipManager parent;
+  
+  public RingStatePersister(GossipManager parent){
+    this.parent = parent;
+  }
+  
+  @Override
+  public void run() {
+    writeToDisk();
+  }
+  
+  File computeTarget(){
+    return new File(parent.getSettings().getPathToRingState(), "ringstate." + 
parent.getMyself().getClusterName() + "." 
+            + parent.getMyself().getId() + ".json");
+  }
+  
+  void writeToDisk(){
+    if (!parent.getSettings().isPersistRingState()){
+      return;
+    }
+    NavigableSet<LocalMember> i = parent.getMembers().keySet();
+    try (FileOutputStream fos = new FileOutputStream(computeTarget())){
+      parent.getObjectMapper().writeValue(fos, i);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  List<LocalMember> readFromDisk(){
+    if (!parent.getSettings().isPersistRingState()){
+      return Collections.emptyList();
+    }
+    try (FileInputStream fos = new FileInputStream(computeTarget())){
+      return parent.getObjectMapper().readValue(fos, ArrayList.class);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+    return Collections.emptyList();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
new file mode 100644
index 0000000..e47fe2a
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Base implementation gossips randomly to live nodes periodically gossips to 
dead ones
+ *
+ */
+public class SimpleActiveGossipper extends AbstractActiveGossiper {
+
+  private ScheduledExecutorService scheduledExecutorService;
+  private final BlockingQueue<Runnable> workQueue;
+  private ThreadPoolExecutor threadService;
+  
+  public SimpleActiveGossipper(GossipManager gossipManager, GossipCore 
gossipCore,
+          MetricRegistry registry) {
+    super(gossipManager, gossipCore, registry);
+    scheduledExecutorService = Executors.newScheduledThreadPool(2);
+    workQueue = new ArrayBlockingQueue<Runnable>(1024);
+    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, 
workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+  }
+
+  @Override
+  public void init() {
+    super.init();
+    scheduledExecutorService.scheduleAtFixedRate(() -> {
+      threadService.execute(() -> {
+        sendToALiveMember();
+      });
+    }, 0, gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(() -> {
+      sendToDeadMember();
+    }, 0, gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendPerNodeData(gossipManager.getMyself(),
+                    selectPartner(gossipManager.getLiveMembers())),
+            0, gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendSharedData(gossipManager.getMyself(),
+                    selectPartner(gossipManager.getLiveMembers())),
+            0, gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+  }
+  
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    scheduledExecutorService.shutdown();
+    try {
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    sendShutdownMessage();
+    threadService.shutdown();
+    try {
+      threadService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+  }
+
+  protected void sendToALiveMember(){
+    LocalMember member = selectPartner(gossipManager.getLiveMembers());
+    sendMembershipList(gossipManager.getMyself(), member);
+  }
+  
+  protected void sendToDeadMember(){
+    LocalMember member = selectPartner(gossipManager.getDeadMembers());
+    sendMembershipList(gossipManager.getMyself(), member);
+  }
+  
+  /**
+   * sends an optimistic shutdown message to several clusters nodes
+   */
+  protected void sendShutdownMessage(){
+    List<LocalMember> l = gossipManager.getLiveMembers();
+    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
+    for (int i = 0; i < sendTo; i++) {
+      threadService.execute(() -> 
sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java
new file mode 100644
index 0000000..04a7080
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public class SystemClock implements Clock {
+
+  @Override
+  public long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+
+  @Override
+  public long nanoTime() {
+    return System.nanoTime();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
new file mode 100644
index 0000000..3b9eafa
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */ 
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.log4j.Logger;
+
+public class UserDataPersister implements Runnable {
+  
+  private static final Logger LOGGER = 
Logger.getLogger(UserDataPersister.class);
+  private final GossipManager parent;
+  private final GossipCore gossipCore; 
+  
+  UserDataPersister(GossipManager parent, GossipCore gossipCore){
+    this.parent = parent;
+    this.gossipCore = gossipCore;
+  }
+  
+  File computeSharedTarget(){
+    return new File(parent.getSettings().getPathToDataState(), "shareddata."
+            + parent.getMyself().getClusterName() + "." + 
parent.getMyself().getId() + ".json");
+  }
+  
+  File computePerNodeTarget() {
+    return new File(parent.getSettings().getPathToDataState(), "pernodedata."
+            + parent.getMyself().getClusterName() + "." + 
parent.getMyself().getId() + ".json");
+  }
+  
+  @SuppressWarnings("unchecked")
+  ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> 
readPerNodeFromDisk(){
+    if (!parent.getSettings().isPersistDataState()){
+      return new ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>>();
+    }
+    try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
+      return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+    return new ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>>();
+  }
+  
+  void writePerNodeToDisk(){
+    if (!parent.getSettings().isPersistDataState()){
+      return;
+    }
+    try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
+      parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData());
+    } catch (IOException e) {
+      LOGGER.warn(e);
+    }
+  }
+  
+  void writeSharedToDisk(){
+    if (!parent.getSettings().isPersistDataState()){
+      return;
+    }
+    try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
+      parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData());
+    } catch (IOException e) {
+      LOGGER.warn(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
+    if (!parent.getSettings().isPersistRingState()){
+      return new ConcurrentHashMap<String, SharedDataMessage>();
+    }
+    try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
+      return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+    return new ConcurrentHashMap<String, SharedDataMessage>();
+  }
+  
+  /**
+   * Writes all pernode and shared data to disk 
+   */
+  @Override
+  public void run() {
+    writePerNodeToDisk();
+    writeSharedToDisk();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
new file mode 100644
index 0000000..f5e568e
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.Member;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpNotAMemberFault;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveGossipMessageHandler implements MessageHandler {
+  @Override
+  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base 
base) {
+    List<Member> remoteGossipMembers = new ArrayList<>();
+    RemoteMember senderMember = null;
+    UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
+    for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+      URI u;
+      try {
+        u = new URI(activeGossipMessage.getMembers().get(i).getUri());
+      } catch (URISyntaxException e) {
+        GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
+        continue;
+      }
+      RemoteMember member = new RemoteMember(
+              activeGossipMessage.getMembers().get(i).getCluster(),
+              u,
+              activeGossipMessage.getMembers().get(i).getId(),
+              activeGossipMessage.getMembers().get(i).getHeartbeat(),
+              activeGossipMessage.getMembers().get(i).getProperties());
+      if (i == 0) {
+        senderMember = member;
+      }
+      if 
(!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) 
{
+        UdpNotAMemberFault f = new UdpNotAMemberFault();
+        f.setException("Not a member of this cluster " + i);
+        f.setUriFrom(activeGossipMessage.getUriFrom());
+        f.setUuid(activeGossipMessage.getUuid());
+        GossipCore.LOGGER.warn(f);
+        gossipCore.sendOneWay(f, member.getUri());
+        continue;
+      }
+      remoteGossipMembers.add(member);
+    }
+    UdpActiveGossipOk o = new UdpActiveGossipOk();
+    o.setUriFrom(activeGossipMessage.getUriFrom());
+    o.setUuid(activeGossipMessage.getUuid());
+    gossipCore.sendOneWay(o, senderMember.getUri());
+    gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
 
b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
new file mode 100644
index 0000000..5b78ce3
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.*;
+
+public class DefaultMessageInvoker implements MessageInvoker {
+  private final MessageInvokerCombiner mic;
+
+  public DefaultMessageInvoker() {
+    mic = new MessageInvokerCombiner();
+    mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
+    mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new 
ShutdownMessageHandler()));
+    mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new 
PerNodeDataMessageHandler()));
+    mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new 
SharedDataMessageHandler()));
+    mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new 
ActiveGossipMessageHandler()));
+  }
+
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, 
Base base) {
+    return mic.invoke(gossipCore, gossipManager, base);
+  }
+}


Reply via email to