http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java 
b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
deleted file mode 100644
index dd1505a..0000000
--- a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gossip.crdt;
-
-import org.apache.gossip.manager.GossipManager;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> {
-  
-  private final Map<String, Long> counters = new HashMap<>();
-  
-  GrowOnlyCounter(Map<String, Long> counters) {
-    this.counters.putAll(counters);
-  }
-  
-  public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) {
-    counters.putAll(growOnlyCounter.counters);
-    if (counters.containsKey(builder.myId)) {
-      Long newValue = counters.get(builder.myId) + builder.counter;
-      counters.replace(builder.myId, newValue);
-    } else {
-      counters.put(builder.myId, builder.counter);
-    }
-  }
-  
-  public GrowOnlyCounter(Builder builder) {
-    counters.put(builder.myId, builder.counter);
-  }
-  
-  public GrowOnlyCounter(GossipManager manager) {
-    counters.put(manager.getMyself().getId(), 0L);
-  }
-  
-  public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter 
other) {
-    counters.putAll(growOnlyCounter.counters);
-    for (Map.Entry<String, Long> entry : other.counters.entrySet()) {
-      String otherKey = entry.getKey();
-      Long otherValue = entry.getValue();
-      
-      if (counters.containsKey(otherKey)) {
-        Long newValue = Math.max(counters.get(otherKey), otherValue);
-        counters.replace(otherKey, newValue);
-      } else {
-        counters.put(otherKey, otherValue);
-      }
-    }
-  }
-  
-  @Override
-  public GrowOnlyCounter merge(GrowOnlyCounter other) {
-    return new GrowOnlyCounter(this, other);
-  }
-  
-  @Override
-  public Long value() {
-    Long globalCount = 0L;
-    for (Long increment : counters.values()) {
-      globalCount += increment;
-    }
-    return globalCount;
-  }
-  
-  @Override
-  public GrowOnlyCounter optimize() {
-    return new GrowOnlyCounter(counters);
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (getClass() != obj.getClass())
-      return false;
-    GrowOnlyCounter other = (GrowOnlyCounter) obj;
-    return value().longValue() == other.value().longValue();
-  }
-  
-  @Override
-  public String toString() {
-    return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + 
"]";
-  }
-  
-  Map<String, Long> getCounters() {
-    return counters;
-  }
-  
-  public static class Builder {
-    
-    private final String myId;
-    
-    private Long counter;
-    
-    public Builder(GossipManager gossipManager) {
-      myId = gossipManager.getMyself().getId();
-      counter = 0L;
-    }
-    
-    public GrowOnlyCounter.Builder increment(Long count) {
-      counter += count;
-      return this;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java 
b/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
deleted file mode 100644
index 9e2dd49..0000000
--- a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.crdt;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-public class GrowOnlySet<ElementType> implements CrdtSet<ElementType, 
Set<ElementType>, GrowOnlySet<ElementType>>{
-
-  private final Set<ElementType> hidden = new LinkedHashSet<>();
-  
-  @SuppressWarnings("unused")
-  /*
-   * Used by SerDe
-   */
-  private GrowOnlySet(){
-    
-  }
-  
-  public GrowOnlySet(Set<ElementType> c){
-    hidden.addAll(c);
-  }
-  
-  public GrowOnlySet(Collection<ElementType> c){
-    hidden.addAll(c);
-  }
-  
-  public GrowOnlySet(GrowOnlySet<ElementType> first, GrowOnlySet<ElementType> 
second){
-    hidden.addAll(first.value());
-    hidden.addAll(second.value());
-  }
-  
-  @Override
-  public GrowOnlySet<ElementType> merge(GrowOnlySet<ElementType> other) {
-    return new GrowOnlySet<>(this, other);
-  }
-
-  @Override
-  public Set<ElementType> value() {
-    Set<ElementType> copy = new LinkedHashSet<>();
-    copy.addAll(hidden);
-    return Collections.unmodifiableSet(copy);
-  }
-  
-  @Override
-  public GrowOnlySet<ElementType> optimize() {
-    return new GrowOnlySet<>(hidden);
-  }
-
-  public int size() {
-    return hidden.size();
-  }
-
-  public boolean isEmpty() {
-    return hidden.isEmpty();
-  }
-
-  public boolean contains(Object o) {
-    return hidden.contains(o);
-  }
-
-  public Iterator<ElementType> iterator() {
-    Set<ElementType> copy = new HashSet<>();
-    copy.addAll(hidden);
-    return copy.iterator();
-  }
-
-  public Object[] toArray() {
-    return hidden.toArray();
-  }
-
-  public <T> T[] toArray(T[] a) {
-    return hidden.toArray(a);
-  }
-
-  public boolean add(ElementType e) {
-    throw new UnsupportedOperationException();
-  }
-
-  public boolean remove(Object o) {
-    throw new UnsupportedOperationException();
-  }
-
-  public boolean containsAll(Collection<?> c) {
-    return hidden.containsAll(c);
-  }
-
-  public boolean addAll(Collection<? extends ElementType> c) {
-    throw new UnsupportedOperationException();
-  }
-
-  public boolean retainAll(Collection<?> c) {
-    throw new UnsupportedOperationException();
-  }
-
-  public boolean removeAll(Collection<?> c) {
-    throw new UnsupportedOperationException();
-  }
-
-  public void clear() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String toString() {
-    return "GrowOnlySet [hidden=" + hidden + "]";
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((hidden == null) ? 0 : hidden.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    @SuppressWarnings("rawtypes")
-    GrowOnlySet other = (GrowOnlySet) obj;
-    if (hidden == null) {
-      if (other.hidden != null)
-        return false;
-    } else if (!hidden.equals(other.hidden))
-      return false;
-    return true;
-  }
-
-  Set<ElementType> getElements(){
-    return hidden;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/crdt/OrSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java 
b/src/main/java/org/apache/gossip/crdt/OrSet.java
deleted file mode 100644
index f84dbc7..0000000
--- a/src/main/java/org/apache/gossip/crdt/OrSet.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.crdt;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.function.BiConsumer;
-
-import org.apache.gossip.crdt.OrSet.Builder.Operation;
-
-/*
- * A immutable set 
- */
-public class OrSet<E>  implements Crdt<Set<E>, OrSet<E>> {
-  
-  private final Map<E, Set<UUID>> elements = new HashMap<>();
-  private final Map<E, Set<UUID>> tombstones = new HashMap<>();
-  private final transient Set<E> val;
-  
-  public OrSet(){
-    val = computeValue();
-  }
-  
-  OrSet(Map<E, Set<UUID>> elements, Map<E, Set<UUID>> tombstones){
-    this.elements.putAll(elements);
-    this.tombstones.putAll(tombstones);
-    val = computeValue();
-  }
-  
-  @SafeVarargs
-  public OrSet(E ... elements){
-    for (E e: elements){
-      internalAdd(e);
-    }
-    val = computeValue();
-  }
-  
-  public OrSet(Builder<E>builder){
-    for (Builder<E>.OrSetElement<E> e: builder.elements){
-      if (e.operation == Operation.ADD){
-        internalAdd(e.element);
-      } else {
-        internalRemove(e.element);
-      }
-    }
-    val = computeValue();
-  }
-  
-  /**
-   * This constructor is the way to remove elements from an existing set
-   * @param set
-   * @param builder 
-   */
-  public OrSet(OrSet<E> set, Builder<E> builder){
-    elements.putAll(set.elements);
-    tombstones.putAll(set.tombstones);
-    for (Builder<E>.OrSetElement<E> e: builder.elements){
-      if (e.operation == Operation.ADD){
-        internalAdd(e.element);
-      } else {
-        internalRemove(e.element);
-      }
-    }
-    val = computeValue();
-  }
-
-  static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) {
-    if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) {
-      return null;
-    }
-    Set<UUID> res = new HashSet<>(a);
-    res.addAll(b);
-    return res;
-  }
-
-  private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) 
{
-    if (value == null) {
-      return;
-    }
-    map.merge(key, value, OrSet::mergeSets);
-  }
-
-  public OrSet(OrSet<E> left, OrSet<E> right){
-    BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, 
other) -> {
-      for (Entry<E, Set<UUID>> l : other.entrySet()){
-        internalSetMerge(items, l.getKey(), l.getValue());
-      }
-    };
-
-    internalMerge.accept(elements, left.elements);
-    internalMerge.accept(elements, right.elements);
-    internalMerge.accept(tombstones, left.tombstones);
-    internalMerge.accept(tombstones, right.tombstones);
-
-    val = computeValue();
-  }
-  
-  public OrSet.Builder<E> builder(){
-    return new OrSet.Builder<>();
-  }
-  
-  @Override
-  public OrSet<E> merge(OrSet<E> other) {
-    return new OrSet<E>(this, other);
-  }
-  
-  private void internalAdd(E element) {
-    Set<UUID> toMerge = new HashSet<>();
-    toMerge.add(UUID.randomUUID());
-    internalSetMerge(elements, element, toMerge);
-  }
-  
-  private void internalRemove(E element){
-    internalSetMerge(tombstones, element, elements.get(element));
-  }
-
-  /*
-   * Computes the live values by analyzing the elements and tombstones
-   */
-  private Set<E> computeValue(){
-    Set<E> values = new HashSet<>();
-    for (Entry<E, Set<UUID>> entry: elements.entrySet()){
-      Set<UUID> deleteIds = tombstones.get(entry.getKey());
-      // if not all tokens for current element are in tombstones
-      if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) {
-        values.add(entry.getKey());
-      }
-    }
-    return values;
-  }
-  
-  @Override
-  public Set<E> value() {
-    return val;
-  }
-
-  @Override
-  public OrSet<E> optimize() {
-    return this;
-  }
-  
-  public static class Builder<E> {
-    public static enum Operation {
-      ADD, REMOVE
-    };
-
-    private class OrSetElement<EL> {
-      EL element;
-      Operation operation;
-
-      private OrSetElement(EL element, Operation operation) {
-        this.element = element;
-        this.operation = operation;
-      }
-    }
-
-    private List<OrSetElement<E>> elements = new ArrayList<>();
-
-    public Builder<E> add(E element) {
-      elements.add(new OrSetElement<E>(element, Operation.ADD));
-      return this;
-    }
-
-    public Builder<E> remove(E element) {
-      elements.add(new OrSetElement<E>(element, Operation.REMOVE));
-      return this;
-    }
-
-    public Builder<E> mutate(E element, Operation operation) {
-      elements.add(new OrSetElement<E>(element, operation));
-      return this;
-    }
-  }
-
-  
-  public int size() {
-    return value().size();
-  }
-
-  
-  public boolean isEmpty() {
-    return value().size() == 0;
-  }
-
-  
-  public boolean contains(Object o) {
-    return value().contains(o);
-  }
-
-  
-  public Iterator<E> iterator() {
-    Iterator<E> managed = value().iterator();
-    return new Iterator<E>() {
-
-      @Override
-      public void remove() {
-        throw new IllegalArgumentException();
-      }
-
-      @Override
-      public boolean hasNext() {
-        return managed.hasNext();
-      }
-
-      @Override
-      public E next() {
-        return managed.next();
-      }
-      
-    };
-  }
-
-  public Object[] toArray() {
-    return value().toArray();
-  }
-
-  public <T> T[] toArray(T[] a) {
-    return value().toArray(a);
-  }
-
-  public boolean add(E e) {
-    throw new IllegalArgumentException("Can not add");
-  }
-
-
-  public boolean remove(Object o) {
-    throw new IllegalArgumentException();
-  }
-
-  public boolean containsAll(Collection<?> c) {
-    return this.value().containsAll(c);
-  }
-
-  public boolean addAll(Collection<? extends E> c) {
-    throw new IllegalArgumentException();
-  }
-
-  public boolean retainAll(Collection<?> c) {
-    throw new IllegalArgumentException();
-  }
-
-  public boolean removeAll(Collection<?> c) {
-    throw new IllegalArgumentException();
-  }
-
-  public void clear() {
-    throw new IllegalArgumentException();
-  }
-
-  @Override
-  public String toString() {
-    return "OrSet [elements=" + elements + ", tombstones=" + tombstones + "]" ;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((value() == null) ? 0 : value().hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    @SuppressWarnings("rawtypes")
-    OrSet other = (OrSet) obj;
-    if (elements == null) {
-      if (other.elements != null)
-        return false;
-    } else if (!value().equals(other.value()))
-      return false;
-    return true;
-  }
-
-  Map<E, Set<UUID>> getElements() {
-    return elements;
-  }
-
-  Map<E, Set<UUID>> getTombstones() {
-    return tombstones;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/event/GossipListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java 
b/src/main/java/org/apache/gossip/event/GossipListener.java
deleted file mode 100644
index 9b33dab..0000000
--- a/src/main/java/org/apache/gossip/event/GossipListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.event;
-
-import org.apache.gossip.Member;
-
-public interface GossipListener {
-  void gossipEvent(Member member, GossipState state);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/event/GossipState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/event/GossipState.java 
b/src/main/java/org/apache/gossip/event/GossipState.java
deleted file mode 100644
index 3b76c9e..0000000
--- a/src/main/java/org/apache/gossip/event/GossipState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.event;
-
-public enum GossipState {
-  UP("up"), DOWN("down");
-  @SuppressWarnings("unused")
-  private final String state;
-
-  private GossipState(String state) {
-    this.state = state;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java 
b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
deleted file mode 100644
index 497894c..0000000
--- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gossip.examples;
-
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
-import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-
-public class StandAloneDatacenterAndRack {
-
-  public static void main (String [] args) throws UnknownHostException, 
InterruptedException {
-    GossipSettings s = new GossipSettings();
-    s.setWindowSize(1000);
-    s.setGossipInterval(100);
-    s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
-    Map<String, String> gossipProps = new HashMap<>();
-    gossipProps.put("sameRackGossipIntervalMs", "2000");
-    gossipProps.put("differentDatacenterGossipIntervalMs", "10000");
-    s.setActiveGossipProperties(gossipProps);
-    Map<String, String> props = new HashMap<>();
-    props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]);
-    props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]);
-    GossipManager manager = GossipManagerBuilder.newBuilder()
-            .cluster("mycluster")
-            .uri(URI.create(args[0]))
-            .id(args[1])
-            .gossipSettings(s)
-            .gossipMembers(Arrays.asList(new RemoteMember("mycluster", 
URI.create(args[2]), args[3])))
-            .properties(props)
-            .build();
-    manager.init();
-    while (true){
-      System.out.println("Live: " + manager.getLiveMembers());
-      System.out.println("Dead: " + manager.getDeadMembers());
-      Thread.sleep(2000);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/examples/StandAloneNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java 
b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
deleted file mode 100644
index 93421b1..0000000
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.examples;
-
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-
-public class StandAloneNode {
-  public static void main (String [] args) throws UnknownHostException, 
InterruptedException{
-    GossipSettings s = new GossipSettings();
-    s.setWindowSize(1000);
-    s.setGossipInterval(100);
-    GossipManager gossipService = GossipManagerBuilder.newBuilder()
-            .cluster("mycluster")
-            .uri(URI.create(args[0]))
-            .id(args[1])
-            .gossipMembers(Arrays.asList( new RemoteMember("mycluster", 
URI.create(args[2]), args[3])))
-            .gossipSettings(s)
-            .build();
-    gossipService.init();
-    while (true){
-      System.out.println("Live: " + gossipService.getLiveMembers());
-      System.out.println("Dead: " + gossipService.getDeadMembers());
-      Thread.sleep(2000);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java 
b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
deleted file mode 100644
index d78cf5e..0000000
--- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.examples;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Arrays;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
-import org.apache.gossip.crdt.GrowOnlyCounter;
-import org.apache.gossip.crdt.OrSet;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.apache.gossip.model.SharedDataMessage;
-
-public class StandAloneNodeCrdtOrSet {
-  public static void main (String [] args) throws InterruptedException, 
IOException{
-    GossipSettings s = new GossipSettings();
-    s.setWindowSize(1000);
-    s.setGossipInterval(100);
-    GossipManager gossipService = GossipManagerBuilder.newBuilder()
-            .cluster("mycluster")
-            .uri(URI.create(args[0]))
-            .id(args[1])
-            .gossipMembers(Arrays.asList( new RemoteMember("mycluster", 
URI.create(args[2]), args[3])))
-            .gossipSettings(s)
-            .build();
-    gossipService.init();
-    
-    new Thread(() -> {
-      while (true){
-      System.out.println("Live: " + gossipService.getLiveMembers());
-      System.out.println("Dead: " + gossipService.getDeadMembers());
-      System.out.println("---------- " + (gossipService.findCrdt("abc") == 
null ? "": 
-          gossipService.findCrdt("abc").value()));
-      System.out.println("********** " + gossipService.findCrdt("abc"));
-      System.out.println("^^^^^^^^^^ " + (gossipService.findCrdt("def") == 
null ? "": 
-        gossipService.findCrdt("def").value()));
-      System.out.println("$$$$$$$$$$ " + gossipService.findCrdt("def"));
-      try {
-        Thread.sleep(2000);
-      } catch (Exception e) {}
-      }
-    }).start();
-    
-    String line = null;
-    try (BufferedReader br = new BufferedReader(new 
InputStreamReader(System.in))){
-      while ( (line = br.readLine()) != null){
-        System.out.println(line);
-        char op = line.charAt(0);
-        String val = line.substring(2);
-        if (op == 'a'){
-          addData(val, gossipService);
-        } else if (op == 'r') {
-          removeData(val, gossipService);
-        } else if (op == 'g'){
-          gcount(val, gossipService);
-        }
-      }
-    }
-  }
-  
-  private static void gcount(String val, GossipManager gossipManager){
-    GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
-    Long l = Long.valueOf(val);
-    if (c == null){
-      c = new GrowOnlyCounter(new 
GrowOnlyCounter.Builder(gossipManager).increment((l)));
-    } else {
-      c = new GrowOnlyCounter(c, new 
GrowOnlyCounter.Builder(gossipManager).increment((l)));
-    }
-    SharedDataMessage m = new SharedDataMessage();
-    m.setExpireAt(Long.MAX_VALUE);
-    m.setKey("def");
-    m.setPayload(c);
-    m.setTimestamp(System.currentTimeMillis());
-    gossipManager.merge(m);
-  }
-  
-  private static void removeData(String val, GossipManager gossipService){
-    @SuppressWarnings("unchecked")
-    OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc");
-    SharedDataMessage m = new SharedDataMessage();
-    m.setExpireAt(Long.MAX_VALUE);
-    m.setKey("abc");
-    m.setPayload(new OrSet<String>(s , new 
OrSet.Builder<String>().remove(val)));
-    m.setTimestamp(System.currentTimeMillis());
-    gossipService.merge(m);
-  }
-  
-  private static void addData(String val, GossipManager gossipService){
-    SharedDataMessage m = new SharedDataMessage();
-    m.setExpireAt(Long.MAX_VALUE);
-    m.setKey("abc");
-    m.setPayload(new OrSet<String>(val));
-    m.setTimestamp(System.currentTimeMillis());
-    gossipService.merge(m);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java 
b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
deleted file mode 100644
index b73550e..0000000
--- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.Map.Entry;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.LocalMember;
-import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.PerNodeDataMessage;
-import org.apache.gossip.model.Member;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedDataMessage;
-import org.apache.gossip.model.ShutdownMessage;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpPerNodeDataMessage;
-import org.apache.gossip.udp.UdpSharedDataMessage;
-import org.apache.log4j.Logger;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-/**
- * The ActiveGossipThread is sends information. Pick a random partner and send 
the membership list to that partner
- */
-public abstract class AbstractActiveGossiper {
-
-  protected static final Logger LOGGER = 
Logger.getLogger(AbstractActiveGossiper.class);
-  
-  protected final GossipManager gossipManager;
-  protected final GossipCore gossipCore;
-  private final Histogram sharedDataHistogram;
-  private final Histogram sendPerNodeDataHistogram;
-  private final Histogram sendMembershipHistorgram;
-  private final Random random;
-
-  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore 
gossipCore, MetricRegistry registry) {
-    this.gossipManager = gossipManager;
-    this.gossipCore = gossipCore;
-    sharedDataHistogram = 
registry.histogram(name(AbstractActiveGossiper.class, 
"sharedDataHistogram-time"));
-    sendPerNodeDataHistogram = 
registry.histogram(name(AbstractActiveGossiper.class, 
"sendPerNodeDataHistogram-time"));
-    sendMembershipHistorgram = 
registry.histogram(name(AbstractActiveGossiper.class, 
"sendMembershipHistorgram-time"));
-    random = new Random();
-  }
-
-  public void init() {
-
-  }
-  
-  public void shutdown() {
-
-  }
-
-  public final void sendShutdownMessage(LocalMember me, LocalMember target){
-    if (target == null){
-      return;
-    }
-    ShutdownMessage m = new ShutdownMessage();
-    m.setNodeId(me.getId());
-    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
-    gossipCore.sendOneWay(m, target.getUri());
-  }
-  
-  public final void sendSharedData(LocalMember me, LocalMember member){
-    if (member == null){
-      return;
-    }
-    long startTime = System.currentTimeMillis();
-    for (Entry<String, SharedDataMessage> innerEntry : 
gossipCore.getSharedData().entrySet()){
-      UdpSharedDataMessage message = new UdpSharedDataMessage();
-      message.setUuid(UUID.randomUUID().toString());
-      message.setUriFrom(me.getId());
-      message.setExpireAt(innerEntry.getValue().getExpireAt());
-      message.setKey(innerEntry.getValue().getKey());
-      message.setNodeId(innerEntry.getValue().getNodeId());
-      message.setTimestamp(innerEntry.getValue().getTimestamp());
-      message.setPayload(innerEntry.getValue().getPayload());
-      gossipCore.sendOneWay(message, member.getUri());
-    }
-    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
-  }
-  
-  public final void sendPerNodeData(LocalMember me, LocalMember member){
-    if (member == null){
-      return;
-    }
-    long startTime = System.currentTimeMillis();
-    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : 
gossipCore.getPerNodeData().entrySet()){
-      for (Entry<String, PerNodeDataMessage> innerEntry : 
entry.getValue().entrySet()){
-        UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
-        message.setUuid(UUID.randomUUID().toString());
-        message.setUriFrom(me.getId());
-        message.setExpireAt(innerEntry.getValue().getExpireAt());
-        message.setKey(innerEntry.getValue().getKey());
-        message.setNodeId(innerEntry.getValue().getNodeId());
-        message.setTimestamp(innerEntry.getValue().getTimestamp());
-        message.setPayload(innerEntry.getValue().getPayload());
-        gossipCore.sendOneWay(message, member.getUri());   
-      }
-    }
-    sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
-  }
-    
-  /**
-   * Performs the sending of the membership list, after we have incremented 
our own heartbeat.
-   */
-  protected void sendMembershipList(LocalMember me, LocalMember member) {
-    if (member == null){
-      return;
-    }
-    long startTime = System.currentTimeMillis();
-    me.setHeartbeat(System.nanoTime());
-    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
-    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
-    message.setUuid(UUID.randomUUID().toString());
-    message.getMembers().add(convert(me));
-    for (LocalMember other : gossipManager.getMembers().keySet()) {
-      message.getMembers().add(convert(other));
-    }
-    Response r = gossipCore.send(message, member.getUri());
-    if (r instanceof ActiveGossipOk){
-      //maybe count metrics here
-    } else {
-      LOGGER.debug("Message " + message + " generated response " + r);
-    }
-    sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
-  }
-    
-  protected final Member convert(LocalMember member){
-    Member gm = new Member();
-    gm.setCluster(member.getClusterName());
-    gm.setHeartbeat(member.getHeartbeat());
-    gm.setUri(member.getUri().toASCIIString());
-    gm.setId(member.getId());
-    gm.setProperties(member.getProperties());
-    return gm;
-  }
-  
-  /**
-   * 
-   * @param memberList
-   *          An immutable list
-   * @return The chosen LocalGossipMember to gossip with.
-   */
-  protected LocalMember selectPartner(List<LocalMember> memberList) {
-    LocalMember member = null;
-    if (memberList.size() > 0) {
-      int randomNeighborIndex = random.nextInt(memberList.size());
-      member = memberList.get(randomNeighborIndex);
-    }
-    return member;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/Clock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/Clock.java 
b/src/main/java/org/apache/gossip/manager/Clock.java
deleted file mode 100644
index 6629c62..0000000
--- a/src/main/java/org/apache/gossip/manager/Clock.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-public interface Clock {
-
-  long currentTimeMillis();
-  long nanoTime();
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/DataReaper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java 
b/src/main/java/org/apache/gossip/manager/DataReaper.java
deleted file mode 100644
index 8175a1b..0000000
--- a/src/main/java/org/apache/gossip/manager/DataReaper.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.model.PerNodeDataMessage;
-import org.apache.gossip.model.SharedDataMessage;
-
-/**
- * We wish to periodically sweep user data and remove entries past their 
timestamp. This
- * implementation periodically sweeps through the data and removes old 
entries. While it might make
- * sense to use a more specific high performance data-structure to handle 
eviction, keep in mind
- * that we are not looking to store a large quantity of data as we currently 
have to transmit this
- * data cluster wide.
- */
-public class DataReaper {
-
-  private final GossipCore gossipCore;
-  private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);
-  private final Clock clock;
-  
-  public DataReaper(GossipCore gossipCore, Clock clock){
-    this.gossipCore = gossipCore;
-    this.clock = clock;
-  }
-  
-  public void init(){
-    Runnable reapPerNodeData = () -> {
-      runPerNodeOnce();
-      runSharedOnce();
-    };
-    scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, 
TimeUnit.SECONDS);
-  }
-  
-  void runSharedOnce(){
-    for (Entry<String, SharedDataMessage> entry : 
gossipCore.getSharedData().entrySet()){
-      if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
-        gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
-      }
-    }
-  }
-  
-  void runPerNodeOnce(){
-    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : 
gossipCore.getPerNodeData().entrySet()){
-      reapData(node.getValue());
-    }
-  }
-  
-  void reapData(ConcurrentHashMap<String, PerNodeDataMessage> 
concurrentHashMap){
-    for (Entry<String, PerNodeDataMessage> entry : 
concurrentHashMap.entrySet()){
-      if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
-        concurrentHashMap.remove(entry.getKey(), entry.getValue());
-      }
-    }
-  }
-  
-  public void close(){
-    scheduledExecutor.shutdown();
-    try {
-      scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
 
b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
deleted file mode 100644
index 2f489a2..0000000
--- 
a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.LocalMember;
-
-import com.codahale.metrics.MetricRegistry;
-
-/**
- * Sends gossip traffic at different rates to other racks and data-centers.
- * This implementation controls the rate at which gossip traffic is shared. 
- * There are two constructs Datacenter and Rack. It is assumed that bandwidth 
and latency is higher
- * in the rack than in the the datacenter. We can adjust the rate at which we 
send messages to each group.
- * 
- */
-public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
-
-  public static final String DATACENTER = "datacenter";
-  public static final String RACK = "rack";
-  
-  private int sameRackGossipIntervalMs = 100;
-  private int sameDcGossipIntervalMs = 500;
-  private int differentDatacenterGossipIntervalMs = 1000;
-  private int randomDeadMemberSendIntervalMs = 250;
-  
-  private ScheduledExecutorService scheduledExecutorService;
-  private final BlockingQueue<Runnable> workQueue;
-  private ThreadPoolExecutor threadService;
-  
-  public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, 
GossipCore gossipCore,
-          MetricRegistry registry) {
-    super(gossipManager, gossipCore, registry);
-    scheduledExecutorService = Executors.newScheduledThreadPool(2);
-    workQueue = new ArrayBlockingQueue<Runnable>(1024);
-    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, 
workQueue,
-            new ThreadPoolExecutor.DiscardOldestPolicy());
-    try {
-      sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
-              .getActiveGossipProperties().get("sameRackGossipIntervalMs"));
-    } catch (RuntimeException ex) { }
-    try {
-      sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
-              .getActiveGossipProperties().get("sameDcGossipIntervalMs"));
-    } catch (RuntimeException ex) { }
-    try {
-      differentDatacenterGossipIntervalMs = 
Integer.parseInt(gossipManager.getSettings()
-              
.getActiveGossipProperties().get("differentDatacenterGossipIntervalMs"));
-    } catch (RuntimeException ex) { }
-    try {
-      randomDeadMemberSendIntervalMs = 
Integer.parseInt(gossipManager.getSettings()
-              
.getActiveGossipProperties().get("randomDeadMemberSendIntervalMs"));
-    } catch (RuntimeException ex) { }
-  }
-
-  @Override
-  public void init() {
-    super.init();
-    //same rack
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-      threadService.execute(() -> sendToSameRackMember()), 
-      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-      threadService.execute(() -> sendToSameRackMemberPerNode()), 
-      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-      threadService.execute(() -> sendToSameRackShared()), 
-      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    //same dc different rack
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-      threadService.execute(() -> sameDcDiffernetRackMember()), 
-      0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-    threadService.execute(() -> sameDcDiffernetRackPerNode()), 
-    0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-    threadService.execute(() -> sameDcDiffernetRackShared()), 
-    0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    //different dc
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-      threadService.execute(() -> differentDcMember()), 
-      0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-    threadService.execute(() -> differentDcPerNode()), 
-    0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
-  
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-    threadService.execute(() -> differentDcShared()), 
-    0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
-    
-    //the dead
-    scheduledExecutorService.scheduleAtFixedRate(() -> 
-      threadService.execute(() -> sendToDeadMember()), 
-      0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS);
-    
-  }
-
-  private void sendToDeadMember() {
-    sendMembershipList(gossipManager.getMyself(), 
selectPartner(gossipManager.getDeadMembers()));
-  }
-  
-  private List<LocalMember> differentDataCenter(){
-    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
-    String rack = gossipManager.getMyself().getProperties().get(RACK);
-    if (myDc == null|| rack == null){
-      return Collections.emptyList();
-    }
-    List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
-    for (LocalMember i : gossipManager.getLiveMembers()){
-      if (!myDc.equals(i.getProperties().get(DATACENTER))){
-        notMyDc.add(i);
-      }
-    }
-    return notMyDc;
-  }
-  
-  private List<LocalMember> sameDatacenterDifferentRack(){
-    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
-    String rack = gossipManager.getMyself().getProperties().get(RACK);
-    if (myDc == null|| rack == null){
-      return Collections.emptyList();
-    }
-    List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
-    for (LocalMember i : gossipManager.getLiveMembers()){
-      if (myDc.equals(i.getProperties().get(DATACENTER)) && 
!rack.equals(i.getProperties().get(RACK))){
-        notMyDc.add(i);
-      }
-    }
-    return notMyDc;
-  }
-    
-  private List<LocalMember> sameRackNodes(){
-    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
-    String rack = gossipManager.getMyself().getProperties().get(RACK);
-    if (myDc == null|| rack == null){
-      return Collections.emptyList();
-    }
-    List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10);
-    for (LocalMember i : gossipManager.getLiveMembers()){
-      if (myDc.equals(i.getProperties().get(DATACENTER))
-              && rack.equals(i.getProperties().get(RACK))){
-        sameDcAndRack.add(i);
-      }
-    }
-    return sameDcAndRack;
-  }
-
-  private void sendToSameRackMember() {
-    LocalMember i = selectPartner(sameRackNodes());
-    sendMembershipList(gossipManager.getMyself(), i);
-  }
-  
-  private void sendToSameRackMemberPerNode() {
-    sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
-  }
-  
-  private void sendToSameRackShared() {
-    sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
-  }
-  
-  private void differentDcMember() {
-    sendMembershipList(gossipManager.getMyself(), 
selectPartner(differentDataCenter()));
-  }
-  
-  private void differentDcPerNode() {
-    sendPerNodeData(gossipManager.getMyself(), 
selectPartner(differentDataCenter()));
-  }
-  
-  private void differentDcShared() {
-    sendSharedData(gossipManager.getMyself(), 
selectPartner(differentDataCenter()));
-  }
-  
-  private void sameDcDiffernetRackMember() {
-    sendMembershipList(gossipManager.getMyself(), 
selectPartner(sameDatacenterDifferentRack()));
-  }
-  
-  private void sameDcDiffernetRackPerNode() {
-    sendPerNodeData(gossipManager.getMyself(), 
selectPartner(sameDatacenterDifferentRack()));
-  }
-  
-  private void sameDcDiffernetRackShared() {
-    sendSharedData(gossipManager.getMyself(), 
selectPartner(sameDatacenterDifferentRack()));
-  }
-  
-  @Override
-  public void shutdown() {
-    super.shutdown();
-    scheduledExecutorService.shutdown();
-    try {
-      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.debug("Issue during shutdown", e);
-    }
-    sendShutdownMessage();
-    threadService.shutdown();
-    try {
-      threadService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.debug("Issue during shutdown", e);
-    }
-  }
-  
-  /**
-   * sends an optimistic shutdown message to several clusters nodes
-   */
-  protected void sendShutdownMessage(){
-    List<LocalMember> l = gossipManager.getLiveMembers();
-    int sendTo = l.size() < 3 ? 1 : l.size() / 3;
-    for (int i = 0; i < sendTo; i++) {
-      threadService.execute(() -> 
sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java 
b/src/main/java/org/apache/gossip/manager/GossipCore.java
deleted file mode 100644
index f53419d..0000000
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.Member;
-import org.apache.gossip.LocalMember;
-import org.apache.gossip.RemoteMember;
-import org.apache.gossip.crdt.Crdt;
-import org.apache.gossip.event.GossipState;
-import org.apache.gossip.model.*;
-import org.apache.gossip.udp.Trackable;
-import org.apache.log4j.Logger;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.URI;
-import java.security.*;
-import java.security.spec.InvalidKeySpecException;
-import java.security.spec.PKCS8EncodedKeySpec;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-
-public class GossipCore implements GossipCoreConstants {
-
-  class LatchAndBase {
-    private final CountDownLatch latch;
-    private volatile Base base;
-    
-    LatchAndBase(){
-      latch = new CountDownLatch(1);
-    }
-    
-  }
-  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
-  private final GossipManager gossipManager;
-  private ConcurrentHashMap<String, LatchAndBase> requests;
-  private ThreadPoolExecutor service;
-  private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>> perNodeData;
-  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
-  private final BlockingQueue<Runnable> workQueue;
-  private final PKCS8EncodedKeySpec privKeySpec;
-  private final PrivateKey privKey;
-  private final Meter messageSerdeException;
-  private final Meter tranmissionException;
-  private final Meter tranmissionSuccess;
-
-  public GossipCore(GossipManager manager, MetricRegistry metrics){
-    this.gossipManager = manager;
-    requests = new ConcurrentHashMap<>();
-    workQueue = new ArrayBlockingQueue<>(1024);
-    service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new 
ThreadPoolExecutor.DiscardOldestPolicy());
-    perNodeData = new ConcurrentHashMap<>();
-    sharedData = new ConcurrentHashMap<>();
-    metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
-    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> 
perNodeData.size());
-    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  
sharedData.size());
-    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
-    metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() ->  
service.getActiveCount());
-    metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() ->  
service.getPoolSize());
-    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
-    tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
-    tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
-
-    if (manager.getSettings().isSignMessages()){
-      File privateKey = new File(manager.getSettings().getPathToKeyStore(), 
manager.getMyself().getId());
-      File publicKey = new File(manager.getSettings().getPathToKeyStore(), 
manager.getMyself().getId() + ".pub");
-      if (!privateKey.exists()){
-        throw new IllegalArgumentException("private key not found " + 
privateKey);
-      }
-      if (!publicKey.exists()){
-        throw new IllegalArgumentException("public key not found " + 
publicKey);
-      }
-      try (FileInputStream keyfis = new FileInputStream(privateKey)) {
-        byte[] encKey = new byte[keyfis.available()];
-        keyfis.read(encKey);
-        keyfis.close();
-        privKeySpec = new PKCS8EncodedKeySpec(encKey);
-        KeyFactory keyFactory = KeyFactory.getInstance("DSA");
-        privKey = keyFactory.generatePrivate(privKeySpec);
-      } catch (NoSuchAlgorithmException | InvalidKeySpecException | 
IOException e) {
-        throw new RuntimeException("failed hard", e);
-      }
-    } else {
-      privKeySpec = null;
-      privKey = null;
-    }
-  }
-
-  private byte [] sign(byte [] bytes){
-    Signature dsa;
-    try {
-      dsa = Signature.getInstance("SHA1withDSA", "SUN");
-      dsa.initSign(privKey);
-      dsa.update(bytes);
-      return dsa.sign();
-    } catch (NoSuchAlgorithmException | NoSuchProviderException | 
InvalidKeyException | SignatureException e) {
-      throw new RuntimeException(e);
-    } 
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public void addSharedData(SharedDataMessage message) {
-    while (true){
-      SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), 
message);
-      if (previous == null){
-        return;
-      }
-      if (message.getPayload() instanceof Crdt){
-        SharedDataMessage merged = new SharedDataMessage();
-        merged.setExpireAt(message.getExpireAt());
-        merged.setKey(message.getKey());
-        merged.setNodeId(message.getNodeId());
-        merged.setTimestamp(message.getTimestamp());
-        Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) 
message.getPayload());
-        merged.setPayload(mergedCrdt);
-        boolean replaced = sharedData.replace(message.getKey(), previous, 
merged);
-        if (replaced){
-          return;
-        }
-      } else {
-        if (previous.getTimestamp() < message.getTimestamp()){
-          boolean result = sharedData.replace(message.getKey(), previous, 
message);
-          if (result){
-            return;
-          }
-        } else {
-          return;
-        }
-      }
-    }
-  }
-  
-  public void addPerNodeData(PerNodeDataMessage message){
-    ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new 
ConcurrentHashMap<>();
-    nodeMap.put(message.getKey(), message);
-    nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
-    if (nodeMap != null){
-      PerNodeDataMessage current = nodeMap.get(message.getKey());
-      if (current == null){
-        nodeMap.putIfAbsent(message.getKey(), message);
-      } else {
-        if (current.getTimestamp() < message.getTimestamp()){
-          nodeMap.replace(message.getKey(), current, message);
-        }
-      }
-    }
-  }
-
-  public ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>> getPerNodeData(){
-    return perNodeData;
-  }
-
-  public ConcurrentHashMap<String, SharedDataMessage> getSharedData() {
-    return sharedData;
-  }
-
-  public void shutdown(){
-    service.shutdown();
-    try {
-      service.awaitTermination(1, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.warn(e);
-    }
-    service.shutdownNow();
-  }
-
-  public void receive(Base base) {
-    if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) {
-      LOGGER.warn("received message can not be handled");
-    }
-  }
-
-  /**
-   * Sends a blocking message.
-   * @param message
-   * @param uri
-   * @throws RuntimeException if data can not be serialized or in transmission 
error
-   */
-  private void sendInternal(Base message, URI uri){
-    byte[] json_bytes;
-    try {
-      if (privKey == null){
-        json_bytes = 
gossipManager.getObjectMapper().writeValueAsBytes(message);
-      } else {
-        SignedPayload p = new SignedPayload();
-        
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
-        p.setSignature(sign(p.getData()));
-        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
-      }
-    } catch (IOException e) {
-      messageSerdeException.mark();
-      throw new RuntimeException(e);
-    }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
-      InetAddress dest = InetAddress.getByName(uri.getHost());
-      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, 
json_bytes.length, dest, uri.getPort());
-      socket.send(datagramPacket);
-      tranmissionSuccess.mark();
-    } catch (IOException e) {
-      tranmissionException.mark();
-      throw new RuntimeException(e);
-    }
-  }
-
-  public Response send(Base message, URI uri){
-    if (LOGGER.isDebugEnabled()){
-      LOGGER.debug("Sending " + message);
-      LOGGER.debug("Current request queue " + requests);
-    }
-
-    final Trackable t;
-    LatchAndBase latchAndBase = null;
-    if (message instanceof Trackable){
-      t = (Trackable) message;
-      latchAndBase = new LatchAndBase();
-      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
-    } else {
-      t = null;
-    }
-    sendInternal(message, uri);
-    if (latchAndBase == null){
-      return null;
-    } 
-    
-    try {
-      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
-      if (complete){
-        return (Response) latchAndBase.base;
-      } else{
-        return null;
-      }
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      if (latchAndBase != null){
-        requests.remove(t.getUuid() + "/" + t.getUriFrom());
-      }
-    }
-  }
-
-  /**
-   * Sends a message across the network while blocking. Catches and ignores 
IOException in transmission. Used
-   * when the protocol for the message is not to wait for a response
-   * @param message the message to send
-   * @param u the uri to send it to
-   */
-  public void sendOneWay(Base message, URI u){
-    byte[] json_bytes;
-    try {
-      if (privKey == null){
-        json_bytes = 
gossipManager.getObjectMapper().writeValueAsBytes(message);
-      } else {
-        SignedPayload p = new SignedPayload();
-        
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
-        p.setSignature(sign(p.getData()));
-        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
-      }
-    } catch (IOException e) {
-      messageSerdeException.mark();
-      throw new RuntimeException(e);
-    }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
-      InetAddress dest = InetAddress.getByName(u.getHost());
-      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, 
json_bytes.length, dest, u.getPort());
-      socket.send(datagramPacket);
-      tranmissionSuccess.mark();
-    } catch (IOException ex) {
-      tranmissionException.mark();
-      LOGGER.debug("Send one way failed", ex);
-    }
-  }
-
-  public void handleResponse(String k, Base v) {
-    LatchAndBase latch = requests.get(k);
-    latch.base = v;
-    latch.latch.countDown();
-  }
-
-  /**
-   * Merge lists from remote members and update heartbeats
-   *
-   * @param gossipManager
-   * @param senderMember
-   * @param remoteList
-   *
-   */
-  public void mergeLists(GossipManager gossipManager, RemoteMember 
senderMember,
-          List<Member> remoteList) {
-    if (LOGGER.isDebugEnabled()){
-      debugState(senderMember, remoteList);
-    }
-    for (LocalMember i : gossipManager.getDeadMembers()) {
-      if (i.getId().equals(senderMember.getId())) {
-        LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " 
+ senderMember.getUri());
-        i.recordHeartbeat(senderMember.getHeartbeat());
-        i.setHeartbeat(senderMember.getHeartbeat());
-        //TODO consider forcing an UP here
-      }
-    }
-    for (Member remoteMember : remoteList) {
-      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
-        continue;
-      }
-      LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
-      remoteMember.getUri(),
-      remoteMember.getId(),
-      remoteMember.getHeartbeat(),
-      remoteMember.getProperties(),
-      gossipManager.getSettings().getWindowSize(),
-      gossipManager.getSettings().getMinimumSamples(),
-      gossipManager.getSettings().getDistribution());
-      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
-      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, 
GossipState.UP);
-      if (result != null){
-        for (Entry<LocalMember, GossipState> localMember : 
gossipManager.getMembers().entrySet()){
-          if (localMember.getKey().getId().equals(remoteMember.getId())){
-            localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
-            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
-            localMember.getKey().setProperties(remoteMember.getProperties());
-          }
-        }
-      }
-    }
-    if (LOGGER.isDebugEnabled()){
-      debugState(senderMember, remoteList);
-    }
-  }
-
-  private void debugState(RemoteMember senderMember,
-          List<Member> remoteList){
-    LOGGER.warn(
-          "-----------------------\n" +
-          "Me " + gossipManager.getMyself() + "\n" +
-          "Sender " + senderMember + "\n" +
-          "RemoteList " + remoteList + "\n" +
-          "Live " + gossipManager.getLiveMembers()+ "\n" +
-          "Dead " + gossipManager.getDeadMembers()+ "\n" +
-          "=======================");
-  }
-
-  @SuppressWarnings("rawtypes")
-  public Crdt merge(SharedDataMessage message) {
-    for (;;){
-      SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), 
message);
-      if (previous == null){
-        return (Crdt) message.getPayload();
-      }
-      SharedDataMessage copy = new SharedDataMessage();
-      copy.setExpireAt(message.getExpireAt());
-      copy.setKey(message.getKey());
-      copy.setNodeId(message.getNodeId());
-      copy.setTimestamp(message.getTimestamp());
-      @SuppressWarnings("unchecked")
-      Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) 
message.getPayload());
-      copy.setPayload(merged);
-      boolean replaced = sharedData.replace(message.getKey(), previous, copy);
-      if (replaced){
-        return merged;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java 
b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
deleted file mode 100644
index 6d3765a..0000000
--- a/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-public interface GossipCoreConstants {
-  String WORKQUEUE_SIZE = "gossip.core.workqueue.size";
-  String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; 
-  String SHARED_DATA_SIZE = "gossip.core.shareddata.size";
-  String REQUEST_SIZE = "gossip.core.requests.size";
-  String THREADPOOL_ACTIVE = "gossip.core.threadpool.active";
-  String THREADPOOL_SIZE = "gossip.core.threadpool.size";
-  String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception";
-  String MESSAGE_TRANSMISSION_EXCEPTION = 
"gossip.core.message_transmission_exception";
-  String MESSAGE_TRANSMISSION_SUCCESS = 
"gossip.core.message_transmission_success";
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java 
b/src/main/java/org/apache/gossip/manager/GossipManager.java
deleted file mode 100644
index c2b50ae..0000000
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import com.codahale.metrics.MetricRegistry;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.LocalMember;
-import org.apache.gossip.Member;
-import org.apache.gossip.crdt.Crdt;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
-import org.apache.gossip.manager.handlers.MessageInvoker;
-import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-import org.apache.gossip.model.PerNodeDataMessage;
-import org.apache.gossip.model.SharedDataMessage;
-import org.apache.log4j.Logger;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-public abstract class GossipManager {
-
-  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
-
-  private final ConcurrentSkipListMap<LocalMember, GossipState> members;
-  private final LocalMember me;
-  private final GossipSettings settings;
-  private final AtomicBoolean gossipServiceRunning;
-  private AbstractActiveGossiper activeGossipThread;
-  private PassiveGossipThread passiveGossipThread;
-  private ExecutorService gossipThreadExecutor;
-  private final GossipCore gossipCore;
-  private final DataReaper dataReaper;
-  private final Clock clock;
-  private final ScheduledExecutorService scheduledServiced;
-  private final MetricRegistry registry;
-  private final RingStatePersister ringState;
-  private final UserDataPersister userDataState;
-  private final GossipMemberStateRefresher memberStateRefresher;
-  private final ObjectMapper objectMapper;
-
-  private final MessageInvoker messageInvoker;
-
-  public GossipManager(String cluster,
-                       URI uri, String id, Map<String, String> properties, 
GossipSettings settings,
-                       List<Member> gossipMembers, GossipListener listener, 
MetricRegistry registry,
-                       ObjectMapper objectMapper, MessageInvoker 
messageInvoker) {
-    this.settings = settings;
-    this.messageInvoker = messageInvoker;
-
-    clock = new SystemClock();
-    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
-            settings.getWindowSize(), settings.getMinimumSamples(), 
settings.getDistribution());
-    gossipCore = new GossipCore(this, registry);
-    dataReaper = new DataReaper(gossipCore, clock);
-    members = new ConcurrentSkipListMap<>();
-    for (Member startupMember : gossipMembers) {
-      if (!startupMember.equals(me)) {
-        LocalMember member = new LocalMember(startupMember.getClusterName(),
-                startupMember.getUri(), startupMember.getId(),
-                clock.nanoTime(), startupMember.getProperties(), 
settings.getWindowSize(),
-                settings.getMinimumSamples(), settings.getDistribution());
-        //TODO should members start in down state?
-        members.put(member, GossipState.DOWN);
-      }
-    }
-    gossipThreadExecutor = Executors.newCachedThreadPool();
-    gossipServiceRunning = new AtomicBoolean(true);
-    this.scheduledServiced = Executors.newScheduledThreadPool(1);
-    this.registry = registry;
-    this.ringState = new RingStatePersister(this);
-    this.userDataState = new UserDataPersister(this, this.gossipCore);
-    this.memberStateRefresher = new GossipMemberStateRefresher(members, 
settings, listener, this::findPerNodeGossipData);
-    this.objectMapper = objectMapper;
-    readSavedRingState();
-    readSavedDataState();
-  }
-
-  public MessageInvoker getMessageInvoker() {
-    return messageInvoker;
-  }
-
-  public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {
-    return members;
-  }
-
-  public GossipSettings getSettings() {
-    return settings;
-  }
-
-  /**
-   * @return a read only list of members found in the DOWN state.
-   */
-  public List<LocalMember> getDeadMembers() {
-    return Collections.unmodifiableList(
-            members.entrySet()
-                    .stream()
-                    .filter(entry -> GossipState.DOWN.equals(entry.getValue()))
-                    .map(Entry::getKey).collect(Collectors.toList()));
-  }
-
-  /**
-   *
-   * @return a read only list of members found in the UP state
-   */
-  public List<LocalMember> getLiveMembers() {
-    return Collections.unmodifiableList(
-            members.entrySet()
-                    .stream()
-                    .filter(entry -> GossipState.UP.equals(entry.getValue()))
-                    .map(Entry::getKey).collect(Collectors.toList()));
-  }
-
-  public LocalMember getMyself() {
-    return me;
-  }
-
-  private AbstractActiveGossiper constructActiveGossiper(){
-    try {
-      Constructor<?> c = 
Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class,
 GossipCore.class, MetricRegistry.class);
-      return (AbstractActiveGossiper) c.newInstance(this, gossipCore, 
registry);
-    } catch (NoSuchMethodException | SecurityException | 
ClassNotFoundException | InstantiationException | IllegalAccessException | 
IllegalArgumentException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Starts the client. Specifically, start the various cycles for this 
protocol. Start the gossip
-   * thread and start the receiver thread.
-   */
-  public void init() {
-    passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, 
gossipCore);
-    gossipThreadExecutor.execute(passiveGossipThread);
-    activeGossipThread = constructActiveGossiper();
-    activeGossipThread.init();
-    dataReaper.init();
-    scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
-    scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, 
TimeUnit.SECONDS);
-    scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, 
TimeUnit.MILLISECONDS);
-    LOGGER.debug("The GossipManager is started.");
-  }
-
-  private void readSavedRingState() {
-    for (LocalMember l : ringState.readFromDisk()){
-      LocalMember member = new LocalMember(l.getClusterName(),
-              l.getUri(), l.getId(),
-              clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
-              settings.getMinimumSamples(), settings.getDistribution());
-      members.putIfAbsent(member, GossipState.DOWN);
-    }
-  }
-
-  private void readSavedDataState() {
-    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : 
userDataState.readPerNodeFromDisk().entrySet()){
-      for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
-        gossipCore.addPerNodeData(j.getValue());
-      }
-    }
-    for (Entry<String, SharedDataMessage> l: 
userDataState.readSharedDataFromDisk().entrySet()){
-      gossipCore.addSharedData(l.getValue());
-    }
-  }
-
-  /**
-   * Shutdown the gossip service.
-   */
-  public void shutdown() {
-    gossipServiceRunning.set(false);
-    gossipThreadExecutor.shutdown();
-    gossipCore.shutdown();
-    dataReaper.close();
-    if (passiveGossipThread != null) {
-      passiveGossipThread.shutdown();
-    }
-    if (activeGossipThread != null) {
-      activeGossipThread.shutdown();
-    }
-    try {
-      boolean result = gossipThreadExecutor.awaitTermination(10, 
TimeUnit.MILLISECONDS);
-      if (!result) {
-        LOGGER.error("executor shutdown timed out");
-      }
-    } catch (InterruptedException e) {
-      LOGGER.error(e);
-    }
-    gossipThreadExecutor.shutdownNow();
-    scheduledServiced.shutdown();
-    try {
-      scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.error(e);
-    }
-    scheduledServiced.shutdownNow();
-  }
-
-  public void gossipPerNodeData(PerNodeDataMessage message){
-    Objects.nonNull(message.getKey());
-    Objects.nonNull(message.getTimestamp());
-    Objects.nonNull(message.getPayload());
-    message.setNodeId(me.getId());
-    gossipCore.addPerNodeData(message);
-  }
-
-  public void gossipSharedData(SharedDataMessage message){
-    Objects.nonNull(message.getKey());
-    Objects.nonNull(message.getTimestamp());
-    Objects.nonNull(message.getPayload());
-    message.setNodeId(me.getId());
-    gossipCore.addSharedData(message);
-  }
-
-
-  @SuppressWarnings("rawtypes")
-  public Crdt findCrdt(String key){
-    SharedDataMessage l = gossipCore.getSharedData().get(key);
-    if (l == null){
-      return null;
-    }
-    if (l.getExpireAt() < clock.currentTimeMillis()){
-      return null;
-    } else {
-      return (Crdt) l.getPayload();
-    }
-  }
-
-  @SuppressWarnings("rawtypes")
-  public Crdt merge(SharedDataMessage message){
-    Objects.nonNull(message.getKey());
-    Objects.nonNull(message.getTimestamp());
-    Objects.nonNull(message.getPayload());
-    message.setNodeId(me.getId());
-    if (! (message.getPayload() instanceof Crdt)){
-      throw new IllegalArgumentException("Not a subclass of CRDT " + 
message.getPayload());
-    }
-    return gossipCore.merge(message);
-  }
-
-  public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){
-    ConcurrentHashMap<String, PerNodeDataMessage> j = 
gossipCore.getPerNodeData().get(nodeId);
-    if (j == null){
-      return null;
-    } else {
-      PerNodeDataMessage l = j.get(key);
-      if (l == null){
-        return null;
-      }
-      if (l.getExpireAt() != null && l.getExpireAt() < 
clock.currentTimeMillis()) {
-        return null;
-      }
-      return l;
-    }
-  }
-
-  public SharedDataMessage findSharedGossipData(String key){
-    SharedDataMessage l = gossipCore.getSharedData().get(key);
-    if (l == null){
-      return null;
-    }
-    if (l.getExpireAt() < clock.currentTimeMillis()){
-      return null;
-    } else {
-      return l;
-    }
-  }
-
-  public DataReaper getDataReaper() {
-    return dataReaper;
-  }
-
-  public RingStatePersister getRingState() {
-    return ringState;
-  }
-
-  public UserDataPersister getUserDataState() {
-    return userDataState;
-  }
-
-  public GossipMemberStateRefresher getMemberStateRefresher() {
-    return memberStateRefresher;
-  }
-
-  public Clock getClock() {
-    return clock;
-  }
-
-  public ObjectMapper getObjectMapper() {
-    return objectMapper;
-  }
-
-  public MetricRegistry getRegistry() {
-    return registry;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java 
b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
deleted file mode 100644
index b87045b..0000000
--- a/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import com.codahale.metrics.MetricRegistry;
-import com.fasterxml.jackson.core.JsonGenerator.Feature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.gossip.Member;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.StartupSettings;
-import org.apache.gossip.crdt.CrdtModule;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
-import org.apache.gossip.manager.handlers.MessageInvoker;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class GossipManagerBuilder {
-
-  public static ManagerBuilder newBuilder() {
-    return new ManagerBuilder();
-  }
-
-  public static final class ManagerBuilder {
-    private String cluster;
-    private URI uri;
-    private String id;
-    private GossipSettings settings;
-    private List<Member> gossipMembers;
-    private GossipListener listener;
-    private MetricRegistry registry;
-    private Map<String,String> properties;
-    private ObjectMapper objectMapper;
-    private MessageInvoker messageInvoker;
-
-    private ManagerBuilder() {}
-
-    private void checkArgument(boolean check, String msg) {
-      if (!check) {
-        throw new IllegalArgumentException(msg);
-      }
-    }
-
-    public ManagerBuilder cluster(String cluster) {
-      this.cluster = cluster;
-      return this;
-    }
-    
-    public ManagerBuilder properties(Map<String,String> properties) {
-      this.properties = properties;
-      return this;
-    }
-
-    public ManagerBuilder id(String id) {
-      this.id = id;
-      return this;
-    }
-
-    public ManagerBuilder gossipSettings(GossipSettings settings) {
-      this.settings = settings;
-      return this;
-    }
-    
-    public ManagerBuilder startupSettings(StartupSettings startupSettings) {
-      this.cluster = startupSettings.getCluster();
-      this.id = startupSettings.getId();
-      this.settings = startupSettings.getGossipSettings();
-      this.gossipMembers = startupSettings.getGossipMembers();
-      this.uri = startupSettings.getUri();
-      return this;
-    }
-
-    public ManagerBuilder gossipMembers(List<Member> members) {
-      this.gossipMembers = members;
-      return this;
-    }
-
-    public ManagerBuilder listener(GossipListener listener) {
-      this.listener = listener;
-      return this;
-    }
-    
-    public ManagerBuilder registry(MetricRegistry registry) {
-      this.registry = registry;
-      return this;
-    }
-
-    public ManagerBuilder uri(URI uri){
-      this.uri = uri;
-      return this;
-    }
-    
-    public ManagerBuilder mapper(ObjectMapper objectMapper){
-      this.objectMapper = objectMapper;
-      return this;
-    }
-
-    public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) {
-      this.messageInvoker = messageInvoker;
-      return this;
-    }
-
-    public GossipManager build() {
-      checkArgument(id != null, "You must specify an id");
-      checkArgument(cluster != null, "You must specify a cluster name");
-      checkArgument(settings != null, "You must specify gossip settings");
-      checkArgument(uri != null, "You must specify a uri");
-      if (registry == null){
-        registry = new MetricRegistry();
-      }
-      if (properties == null){
-        properties = new HashMap<String,String>();
-      }
-      if (listener == null){
-        listener((a,b) -> {});
-      }
-      if (gossipMembers == null) {
-        gossipMembers = new ArrayList<>();
-      }
-      if (objectMapper == null) {
-        objectMapper = new ObjectMapper();
-        objectMapper.enableDefaultTyping();
-        objectMapper.registerModule(new CrdtModule());
-        objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
-      }
-      if (messageInvoker == null) {
-        messageInvoker = new DefaultMessageInvoker();
-      } 
-      return new GossipManager(cluster, uri, id, properties, settings, 
gossipMembers, listener, registry, objectMapper, messageInvoker) {} ;
-    }
-  }
-
-}

Reply via email to