http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java deleted file mode 100644 index dd1505a..0000000 --- a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gossip.crdt; - -import org.apache.gossip.manager.GossipManager; - -import java.util.HashMap; -import java.util.Map; - -public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> { - - private final Map<String, Long> counters = new HashMap<>(); - - GrowOnlyCounter(Map<String, Long> counters) { - this.counters.putAll(counters); - } - - public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) { - counters.putAll(growOnlyCounter.counters); - if (counters.containsKey(builder.myId)) { - Long newValue = counters.get(builder.myId) + builder.counter; - counters.replace(builder.myId, newValue); - } else { - counters.put(builder.myId, builder.counter); - } - } - - public GrowOnlyCounter(Builder builder) { - counters.put(builder.myId, builder.counter); - } - - public GrowOnlyCounter(GossipManager manager) { - counters.put(manager.getMyself().getId(), 0L); - } - - public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) { - counters.putAll(growOnlyCounter.counters); - for (Map.Entry<String, Long> entry : other.counters.entrySet()) { - String otherKey = entry.getKey(); - Long otherValue = entry.getValue(); - - if (counters.containsKey(otherKey)) { - Long newValue = Math.max(counters.get(otherKey), otherValue); - counters.replace(otherKey, newValue); - } else { - counters.put(otherKey, otherValue); - } - } - } - - @Override - public GrowOnlyCounter merge(GrowOnlyCounter other) { - return new GrowOnlyCounter(this, other); - } - - @Override - public Long value() { - Long globalCount = 0L; - for (Long increment : counters.values()) { - globalCount += increment; - } - return globalCount; - } - - @Override - public GrowOnlyCounter optimize() { - return new GrowOnlyCounter(counters); - } - - @Override - public boolean equals(Object obj) { - if (getClass() != obj.getClass()) - return false; - GrowOnlyCounter other = (GrowOnlyCounter) obj; - return value().longValue() == other.value().longValue(); - } - - @Override - public String toString() { - return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]"; - } - - Map<String, Long> getCounters() { - return counters; - } - - public static class Builder { - - private final String myId; - - private Long counter; - - public Builder(GossipManager gossipManager) { - myId = gossipManager.getMyself().getId(); - counter = 0L; - } - - public GrowOnlyCounter.Builder increment(Long count) { - counter += count; - return this; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java b/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java deleted file mode 100644 index 9e2dd49..0000000 --- a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.crdt; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Set; - -public class GrowOnlySet<ElementType> implements CrdtSet<ElementType, Set<ElementType>, GrowOnlySet<ElementType>>{ - - private final Set<ElementType> hidden = new LinkedHashSet<>(); - - @SuppressWarnings("unused") - /* - * Used by SerDe - */ - private GrowOnlySet(){ - - } - - public GrowOnlySet(Set<ElementType> c){ - hidden.addAll(c); - } - - public GrowOnlySet(Collection<ElementType> c){ - hidden.addAll(c); - } - - public GrowOnlySet(GrowOnlySet<ElementType> first, GrowOnlySet<ElementType> second){ - hidden.addAll(first.value()); - hidden.addAll(second.value()); - } - - @Override - public GrowOnlySet<ElementType> merge(GrowOnlySet<ElementType> other) { - return new GrowOnlySet<>(this, other); - } - - @Override - public Set<ElementType> value() { - Set<ElementType> copy = new LinkedHashSet<>(); - copy.addAll(hidden); - return Collections.unmodifiableSet(copy); - } - - @Override - public GrowOnlySet<ElementType> optimize() { - return new GrowOnlySet<>(hidden); - } - - public int size() { - return hidden.size(); - } - - public boolean isEmpty() { - return hidden.isEmpty(); - } - - public boolean contains(Object o) { - return hidden.contains(o); - } - - public Iterator<ElementType> iterator() { - Set<ElementType> copy = new HashSet<>(); - copy.addAll(hidden); - return copy.iterator(); - } - - public Object[] toArray() { - return hidden.toArray(); - } - - public <T> T[] toArray(T[] a) { - return hidden.toArray(a); - } - - public boolean add(ElementType e) { - throw new UnsupportedOperationException(); - } - - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - public boolean containsAll(Collection<?> c) { - return hidden.containsAll(c); - } - - public boolean addAll(Collection<? extends ElementType> c) { - throw new UnsupportedOperationException(); - } - - public boolean retainAll(Collection<?> c) { - throw new UnsupportedOperationException(); - } - - public boolean removeAll(Collection<?> c) { - throw new UnsupportedOperationException(); - } - - public void clear() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "GrowOnlySet [hidden=" + hidden + "]"; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((hidden == null) ? 0 : hidden.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - @SuppressWarnings("rawtypes") - GrowOnlySet other = (GrowOnlySet) obj; - if (hidden == null) { - if (other.hidden != null) - return false; - } else if (!hidden.equals(other.hidden)) - return false; - return true; - } - - Set<ElementType> getElements(){ - return hidden; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/crdt/OrSet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java b/src/main/java/org/apache/gossip/crdt/OrSet.java deleted file mode 100644 index f84dbc7..0000000 --- a/src/main/java/org/apache/gossip/crdt/OrSet.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.crdt; - -import java.util.*; -import java.util.Map.Entry; -import java.util.function.BiConsumer; - -import org.apache.gossip.crdt.OrSet.Builder.Operation; - -/* - * A immutable set - */ -public class OrSet<E> implements Crdt<Set<E>, OrSet<E>> { - - private final Map<E, Set<UUID>> elements = new HashMap<>(); - private final Map<E, Set<UUID>> tombstones = new HashMap<>(); - private final transient Set<E> val; - - public OrSet(){ - val = computeValue(); - } - - OrSet(Map<E, Set<UUID>> elements, Map<E, Set<UUID>> tombstones){ - this.elements.putAll(elements); - this.tombstones.putAll(tombstones); - val = computeValue(); - } - - @SafeVarargs - public OrSet(E ... elements){ - for (E e: elements){ - internalAdd(e); - } - val = computeValue(); - } - - public OrSet(Builder<E>builder){ - for (Builder<E>.OrSetElement<E> e: builder.elements){ - if (e.operation == Operation.ADD){ - internalAdd(e.element); - } else { - internalRemove(e.element); - } - } - val = computeValue(); - } - - /** - * This constructor is the way to remove elements from an existing set - * @param set - * @param builder - */ - public OrSet(OrSet<E> set, Builder<E> builder){ - elements.putAll(set.elements); - tombstones.putAll(set.tombstones); - for (Builder<E>.OrSetElement<E> e: builder.elements){ - if (e.operation == Operation.ADD){ - internalAdd(e.element); - } else { - internalRemove(e.element); - } - } - val = computeValue(); - } - - static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) { - if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) { - return null; - } - Set<UUID> res = new HashSet<>(a); - res.addAll(b); - return res; - } - - private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) { - if (value == null) { - return; - } - map.merge(key, value, OrSet::mergeSets); - } - - public OrSet(OrSet<E> left, OrSet<E> right){ - BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, other) -> { - for (Entry<E, Set<UUID>> l : other.entrySet()){ - internalSetMerge(items, l.getKey(), l.getValue()); - } - }; - - internalMerge.accept(elements, left.elements); - internalMerge.accept(elements, right.elements); - internalMerge.accept(tombstones, left.tombstones); - internalMerge.accept(tombstones, right.tombstones); - - val = computeValue(); - } - - public OrSet.Builder<E> builder(){ - return new OrSet.Builder<>(); - } - - @Override - public OrSet<E> merge(OrSet<E> other) { - return new OrSet<E>(this, other); - } - - private void internalAdd(E element) { - Set<UUID> toMerge = new HashSet<>(); - toMerge.add(UUID.randomUUID()); - internalSetMerge(elements, element, toMerge); - } - - private void internalRemove(E element){ - internalSetMerge(tombstones, element, elements.get(element)); - } - - /* - * Computes the live values by analyzing the elements and tombstones - */ - private Set<E> computeValue(){ - Set<E> values = new HashSet<>(); - for (Entry<E, Set<UUID>> entry: elements.entrySet()){ - Set<UUID> deleteIds = tombstones.get(entry.getKey()); - // if not all tokens for current element are in tombstones - if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) { - values.add(entry.getKey()); - } - } - return values; - } - - @Override - public Set<E> value() { - return val; - } - - @Override - public OrSet<E> optimize() { - return this; - } - - public static class Builder<E> { - public static enum Operation { - ADD, REMOVE - }; - - private class OrSetElement<EL> { - EL element; - Operation operation; - - private OrSetElement(EL element, Operation operation) { - this.element = element; - this.operation = operation; - } - } - - private List<OrSetElement<E>> elements = new ArrayList<>(); - - public Builder<E> add(E element) { - elements.add(new OrSetElement<E>(element, Operation.ADD)); - return this; - } - - public Builder<E> remove(E element) { - elements.add(new OrSetElement<E>(element, Operation.REMOVE)); - return this; - } - - public Builder<E> mutate(E element, Operation operation) { - elements.add(new OrSetElement<E>(element, operation)); - return this; - } - } - - - public int size() { - return value().size(); - } - - - public boolean isEmpty() { - return value().size() == 0; - } - - - public boolean contains(Object o) { - return value().contains(o); - } - - - public Iterator<E> iterator() { - Iterator<E> managed = value().iterator(); - return new Iterator<E>() { - - @Override - public void remove() { - throw new IllegalArgumentException(); - } - - @Override - public boolean hasNext() { - return managed.hasNext(); - } - - @Override - public E next() { - return managed.next(); - } - - }; - } - - public Object[] toArray() { - return value().toArray(); - } - - public <T> T[] toArray(T[] a) { - return value().toArray(a); - } - - public boolean add(E e) { - throw new IllegalArgumentException("Can not add"); - } - - - public boolean remove(Object o) { - throw new IllegalArgumentException(); - } - - public boolean containsAll(Collection<?> c) { - return this.value().containsAll(c); - } - - public boolean addAll(Collection<? extends E> c) { - throw new IllegalArgumentException(); - } - - public boolean retainAll(Collection<?> c) { - throw new IllegalArgumentException(); - } - - public boolean removeAll(Collection<?> c) { - throw new IllegalArgumentException(); - } - - public void clear() { - throw new IllegalArgumentException(); - } - - @Override - public String toString() { - return "OrSet [elements=" + elements + ", tombstones=" + tombstones + "]" ; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((value() == null) ? 0 : value().hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - @SuppressWarnings("rawtypes") - OrSet other = (OrSet) obj; - if (elements == null) { - if (other.elements != null) - return false; - } else if (!value().equals(other.value())) - return false; - return true; - } - - Map<E, Set<UUID>> getElements() { - return elements; - } - - Map<E, Set<UUID>> getTombstones() { - return tombstones; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/event/GossipListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java deleted file mode 100644 index 9b33dab..0000000 --- a/src/main/java/org/apache/gossip/event/GossipListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.event; - -import org.apache.gossip.Member; - -public interface GossipListener { - void gossipEvent(Member member, GossipState state); -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/event/GossipState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/event/GossipState.java b/src/main/java/org/apache/gossip/event/GossipState.java deleted file mode 100644 index 3b76c9e..0000000 --- a/src/main/java/org/apache/gossip/event/GossipState.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.event; - -public enum GossipState { - UP("up"), DOWN("down"); - @SuppressWarnings("unused") - private final String state; - - private GossipState(String state) { - this.state = state; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java deleted file mode 100644 index 497894c..0000000 --- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gossip.examples; - -import java.net.URI; -import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteMember; -import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; - -public class StandAloneDatacenterAndRack { - - public static void main (String [] args) throws UnknownHostException, InterruptedException { - GossipSettings s = new GossipSettings(); - s.setWindowSize(1000); - s.setGossipInterval(100); - s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); - Map<String, String> gossipProps = new HashMap<>(); - gossipProps.put("sameRackGossipIntervalMs", "2000"); - gossipProps.put("differentDatacenterGossipIntervalMs", "10000"); - s.setActiveGossipProperties(gossipProps); - Map<String, String> props = new HashMap<>(); - props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]); - props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]); - GossipManager manager = GossipManagerBuilder.newBuilder() - .cluster("mycluster") - .uri(URI.create(args[0])) - .id(args[1]) - .gossipSettings(s) - .gossipMembers(Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3]))) - .properties(props) - .build(); - manager.init(); - while (true){ - System.out.println("Live: " + manager.getLiveMembers()); - System.out.println("Dead: " + manager.getDeadMembers()); - Thread.sleep(2000); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/examples/StandAloneNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java deleted file mode 100644 index 93421b1..0000000 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.examples; - -import java.net.URI; -import java.net.UnknownHostException; -import java.util.Arrays; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteMember; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; - -public class StandAloneNode { - public static void main (String [] args) throws UnknownHostException, InterruptedException{ - GossipSettings s = new GossipSettings(); - s.setWindowSize(1000); - s.setGossipInterval(100); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster("mycluster") - .uri(URI.create(args[0])) - .id(args[1]) - .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) - .gossipSettings(s) - .build(); - gossipService.init(); - while (true){ - System.out.println("Live: " + gossipService.getLiveMembers()); - System.out.println("Dead: " + gossipService.getDeadMembers()); - Thread.sleep(2000); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java deleted file mode 100644 index d78cf5e..0000000 --- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.examples; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URI; -import java.util.Arrays; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteMember; -import org.apache.gossip.crdt.GrowOnlyCounter; -import org.apache.gossip.crdt.OrSet; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.gossip.model.SharedDataMessage; - -public class StandAloneNodeCrdtOrSet { - public static void main (String [] args) throws InterruptedException, IOException{ - GossipSettings s = new GossipSettings(); - s.setWindowSize(1000); - s.setGossipInterval(100); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster("mycluster") - .uri(URI.create(args[0])) - .id(args[1]) - .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) - .gossipSettings(s) - .build(); - gossipService.init(); - - new Thread(() -> { - while (true){ - System.out.println("Live: " + gossipService.getLiveMembers()); - System.out.println("Dead: " + gossipService.getDeadMembers()); - System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? "": - gossipService.findCrdt("abc").value())); - System.out.println("********** " + gossipService.findCrdt("abc")); - System.out.println("^^^^^^^^^^ " + (gossipService.findCrdt("def") == null ? "": - gossipService.findCrdt("def").value())); - System.out.println("$$$$$$$$$$ " + gossipService.findCrdt("def")); - try { - Thread.sleep(2000); - } catch (Exception e) {} - } - }).start(); - - String line = null; - try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){ - while ( (line = br.readLine()) != null){ - System.out.println(line); - char op = line.charAt(0); - String val = line.substring(2); - if (op == 'a'){ - addData(val, gossipService); - } else if (op == 'r') { - removeData(val, gossipService); - } else if (op == 'g'){ - gcount(val, gossipService); - } - } - } - } - - private static void gcount(String val, GossipManager gossipManager){ - GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def"); - Long l = Long.valueOf(val); - if (c == null){ - c = new GrowOnlyCounter(new GrowOnlyCounter.Builder(gossipManager).increment((l))); - } else { - c = new GrowOnlyCounter(c, new GrowOnlyCounter.Builder(gossipManager).increment((l))); - } - SharedDataMessage m = new SharedDataMessage(); - m.setExpireAt(Long.MAX_VALUE); - m.setKey("def"); - m.setPayload(c); - m.setTimestamp(System.currentTimeMillis()); - gossipManager.merge(m); - } - - private static void removeData(String val, GossipManager gossipService){ - @SuppressWarnings("unchecked") - OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc"); - SharedDataMessage m = new SharedDataMessage(); - m.setExpireAt(Long.MAX_VALUE); - m.setKey("abc"); - m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val))); - m.setTimestamp(System.currentTimeMillis()); - gossipService.merge(m); - } - - private static void addData(String val, GossipManager gossipService){ - SharedDataMessage m = new SharedDataMessage(); - m.setExpireAt(Long.MAX_VALUE); - m.setKey("abc"); - m.setPayload(new OrSet<String>(val)); - m.setTimestamp(System.currentTimeMillis()); - gossipService.merge(m); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java deleted file mode 100644 index b73550e..0000000 --- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.util.Map.Entry; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.LocalMember; -import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.PerNodeDataMessage; -import org.apache.gossip.model.Member; -import org.apache.gossip.model.Response; -import org.apache.gossip.model.SharedDataMessage; -import org.apache.gossip.model.ShutdownMessage; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.gossip.udp.UdpPerNodeDataMessage; -import org.apache.gossip.udp.UdpSharedDataMessage; -import org.apache.log4j.Logger; - -import static com.codahale.metrics.MetricRegistry.name; - -/** - * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner - */ -public abstract class AbstractActiveGossiper { - - protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class); - - protected final GossipManager gossipManager; - protected final GossipCore gossipCore; - private final Histogram sharedDataHistogram; - private final Histogram sendPerNodeDataHistogram; - private final Histogram sendMembershipHistorgram; - private final Random random; - - public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { - this.gossipManager = gossipManager; - this.gossipCore = gossipCore; - sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); - sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); - sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time")); - random = new Random(); - } - - public void init() { - - } - - public void shutdown() { - - } - - public final void sendShutdownMessage(LocalMember me, LocalMember target){ - if (target == null){ - return; - } - ShutdownMessage m = new ShutdownMessage(); - m.setNodeId(me.getId()); - m.setShutdownAtNanos(gossipManager.getClock().nanoTime()); - gossipCore.sendOneWay(m, target.getUri()); - } - - public final void sendSharedData(LocalMember me, LocalMember member){ - if (member == null){ - return; - } - long startTime = System.currentTimeMillis(); - for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){ - UdpSharedDataMessage message = new UdpSharedDataMessage(); - message.setUuid(UUID.randomUUID().toString()); - message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - gossipCore.sendOneWay(message, member.getUri()); - } - sharedDataHistogram.update(System.currentTimeMillis() - startTime); - } - - public final void sendPerNodeData(LocalMember me, LocalMember member){ - if (member == null){ - return; - } - long startTime = System.currentTimeMillis(); - for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){ - for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){ - UdpPerNodeDataMessage message = new UdpPerNodeDataMessage(); - message.setUuid(UUID.randomUUID().toString()); - message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - gossipCore.sendOneWay(message, member.getUri()); - } - } - sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); - } - - /** - * Performs the sending of the membership list, after we have incremented our own heartbeat. - */ - protected void sendMembershipList(LocalMember me, LocalMember member) { - if (member == null){ - return; - } - long startTime = System.currentTimeMillis(); - me.setHeartbeat(System.nanoTime()); - UdpActiveGossipMessage message = new UdpActiveGossipMessage(); - message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); - message.setUuid(UUID.randomUUID().toString()); - message.getMembers().add(convert(me)); - for (LocalMember other : gossipManager.getMembers().keySet()) { - message.getMembers().add(convert(other)); - } - Response r = gossipCore.send(message, member.getUri()); - if (r instanceof ActiveGossipOk){ - //maybe count metrics here - } else { - LOGGER.debug("Message " + message + " generated response " + r); - } - sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); - } - - protected final Member convert(LocalMember member){ - Member gm = new Member(); - gm.setCluster(member.getClusterName()); - gm.setHeartbeat(member.getHeartbeat()); - gm.setUri(member.getUri().toASCIIString()); - gm.setId(member.getId()); - gm.setProperties(member.getProperties()); - return gm; - } - - /** - * - * @param memberList - * An immutable list - * @return The chosen LocalGossipMember to gossip with. - */ - protected LocalMember selectPartner(List<LocalMember> memberList) { - LocalMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } - return member; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/Clock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/Clock.java b/src/main/java/org/apache/gossip/manager/Clock.java deleted file mode 100644 index 6629c62..0000000 --- a/src/main/java/org/apache/gossip/manager/Clock.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -public interface Clock { - - long currentTimeMillis(); - long nanoTime(); - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/DataReaper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java deleted file mode 100644 index 8175a1b..0000000 --- a/src/main/java/org/apache/gossip/manager/DataReaper.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.model.PerNodeDataMessage; -import org.apache.gossip.model.SharedDataMessage; - -/** - * We wish to periodically sweep user data and remove entries past their timestamp. This - * implementation periodically sweeps through the data and removes old entries. While it might make - * sense to use a more specific high performance data-structure to handle eviction, keep in mind - * that we are not looking to store a large quantity of data as we currently have to transmit this - * data cluster wide. - */ -public class DataReaper { - - private final GossipCore gossipCore; - private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); - private final Clock clock; - - public DataReaper(GossipCore gossipCore, Clock clock){ - this.gossipCore = gossipCore; - this.clock = clock; - } - - public void init(){ - Runnable reapPerNodeData = () -> { - runPerNodeOnce(); - runSharedOnce(); - }; - scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS); - } - - void runSharedOnce(){ - for (Entry<String, SharedDataMessage> entry : gossipCore.getSharedData().entrySet()){ - if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ - gossipCore.getSharedData().remove(entry.getKey(), entry.getValue()); - } - } - } - - void runPerNodeOnce(){ - for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : gossipCore.getPerNodeData().entrySet()){ - reapData(node.getValue()); - } - } - - void reapData(ConcurrentHashMap<String, PerNodeDataMessage> concurrentHashMap){ - for (Entry<String, PerNodeDataMessage> entry : concurrentHashMap.entrySet()){ - if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ - concurrentHashMap.remove(entry.getKey(), entry.getValue()); - } - } - } - - public void close(){ - scheduledExecutor.shutdown(); - try { - scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java deleted file mode 100644 index 2f489a2..0000000 --- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.util.List; -import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.LocalMember; - -import com.codahale.metrics.MetricRegistry; - -/** - * Sends gossip traffic at different rates to other racks and data-centers. - * This implementation controls the rate at which gossip traffic is shared. - * There are two constructs Datacenter and Rack. It is assumed that bandwidth and latency is higher - * in the rack than in the the datacenter. We can adjust the rate at which we send messages to each group. - * - */ -public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { - - public static final String DATACENTER = "datacenter"; - public static final String RACK = "rack"; - - private int sameRackGossipIntervalMs = 100; - private int sameDcGossipIntervalMs = 500; - private int differentDatacenterGossipIntervalMs = 1000; - private int randomDeadMemberSendIntervalMs = 250; - - private ScheduledExecutorService scheduledExecutorService; - private final BlockingQueue<Runnable> workQueue; - private ThreadPoolExecutor threadService; - - public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, - MetricRegistry registry) { - super(gossipManager, gossipCore, registry); - scheduledExecutorService = Executors.newScheduledThreadPool(2); - workQueue = new ArrayBlockingQueue<Runnable>(1024); - threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, - new ThreadPoolExecutor.DiscardOldestPolicy()); - try { - sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() - .getActiveGossipProperties().get("sameRackGossipIntervalMs")); - } catch (RuntimeException ex) { } - try { - sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() - .getActiveGossipProperties().get("sameDcGossipIntervalMs")); - } catch (RuntimeException ex) { } - try { - differentDatacenterGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() - .getActiveGossipProperties().get("differentDatacenterGossipIntervalMs")); - } catch (RuntimeException ex) { } - try { - randomDeadMemberSendIntervalMs = Integer.parseInt(gossipManager.getSettings() - .getActiveGossipProperties().get("randomDeadMemberSendIntervalMs")); - } catch (RuntimeException ex) { } - } - - @Override - public void init() { - super.init(); - //same rack - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sendToSameRackMember()), - 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); - - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sendToSameRackMemberPerNode()), - 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); - - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sendToSameRackShared()), - 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); - - //same dc different rack - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sameDcDiffernetRackMember()), - 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); - - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sameDcDiffernetRackPerNode()), - 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); - - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sameDcDiffernetRackShared()), - 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); - - //different dc - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> differentDcMember()), - 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); - - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> differentDcPerNode()), - 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); - - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> differentDcShared()), - 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); - - //the dead - scheduledExecutorService.scheduleAtFixedRate(() -> - threadService.execute(() -> sendToDeadMember()), - 0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS); - - } - - private void sendToDeadMember() { - sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers())); - } - - private List<LocalMember> differentDataCenter(){ - String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); - String rack = gossipManager.getMyself().getProperties().get(RACK); - if (myDc == null|| rack == null){ - return Collections.emptyList(); - } - List<LocalMember> notMyDc = new ArrayList<LocalMember>(10); - for (LocalMember i : gossipManager.getLiveMembers()){ - if (!myDc.equals(i.getProperties().get(DATACENTER))){ - notMyDc.add(i); - } - } - return notMyDc; - } - - private List<LocalMember> sameDatacenterDifferentRack(){ - String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); - String rack = gossipManager.getMyself().getProperties().get(RACK); - if (myDc == null|| rack == null){ - return Collections.emptyList(); - } - List<LocalMember> notMyDc = new ArrayList<LocalMember>(10); - for (LocalMember i : gossipManager.getLiveMembers()){ - if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){ - notMyDc.add(i); - } - } - return notMyDc; - } - - private List<LocalMember> sameRackNodes(){ - String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); - String rack = gossipManager.getMyself().getProperties().get(RACK); - if (myDc == null|| rack == null){ - return Collections.emptyList(); - } - List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10); - for (LocalMember i : gossipManager.getLiveMembers()){ - if (myDc.equals(i.getProperties().get(DATACENTER)) - && rack.equals(i.getProperties().get(RACK))){ - sameDcAndRack.add(i); - } - } - return sameDcAndRack; - } - - private void sendToSameRackMember() { - LocalMember i = selectPartner(sameRackNodes()); - sendMembershipList(gossipManager.getMyself(), i); - } - - private void sendToSameRackMemberPerNode() { - sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes())); - } - - private void sendToSameRackShared() { - sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes())); - } - - private void differentDcMember() { - sendMembershipList(gossipManager.getMyself(), selectPartner(differentDataCenter())); - } - - private void differentDcPerNode() { - sendPerNodeData(gossipManager.getMyself(), selectPartner(differentDataCenter())); - } - - private void differentDcShared() { - sendSharedData(gossipManager.getMyself(), selectPartner(differentDataCenter())); - } - - private void sameDcDiffernetRackMember() { - sendMembershipList(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); - } - - private void sameDcDiffernetRackPerNode() { - sendPerNodeData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); - } - - private void sameDcDiffernetRackShared() { - sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); - } - - @Override - public void shutdown() { - super.shutdown(); - scheduledExecutorService.shutdown(); - try { - scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Issue during shutdown", e); - } - sendShutdownMessage(); - threadService.shutdown(); - try { - threadService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Issue during shutdown", e); - } - } - - /** - * sends an optimistic shutdown message to several clusters nodes - */ - protected void sendShutdownMessage(){ - List<LocalMember> l = gossipManager.getLiveMembers(); - int sendTo = l.size() < 3 ? 1 : l.size() / 3; - for (int i = 0; i < sendTo; i++) { - threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java deleted file mode 100644 index f53419d..0000000 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.Member; -import org.apache.gossip.LocalMember; -import org.apache.gossip.RemoteMember; -import org.apache.gossip.crdt.Crdt; -import org.apache.gossip.event.GossipState; -import org.apache.gossip.model.*; -import org.apache.gossip.udp.Trackable; -import org.apache.log4j.Logger; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.URI; -import java.security.*; -import java.security.spec.InvalidKeySpecException; -import java.security.spec.PKCS8EncodedKeySpec; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.*; - -public class GossipCore implements GossipCoreConstants { - - class LatchAndBase { - private final CountDownLatch latch; - private volatile Base base; - - LatchAndBase(){ - latch = new CountDownLatch(1); - } - - } - public static final Logger LOGGER = Logger.getLogger(GossipCore.class); - private final GossipManager gossipManager; - private ConcurrentHashMap<String, LatchAndBase> requests; - private ThreadPoolExecutor service; - private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; - private final ConcurrentHashMap<String, SharedDataMessage> sharedData; - private final BlockingQueue<Runnable> workQueue; - private final PKCS8EncodedKeySpec privKeySpec; - private final PrivateKey privKey; - private final Meter messageSerdeException; - private final Meter tranmissionException; - private final Meter tranmissionSuccess; - - public GossipCore(GossipManager manager, MetricRegistry metrics){ - this.gossipManager = manager; - requests = new ConcurrentHashMap<>(); - workQueue = new ArrayBlockingQueue<>(1024); - service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); - perNodeData = new ConcurrentHashMap<>(); - sharedData = new ConcurrentHashMap<>(); - metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size()); - metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); - metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); - metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); - metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount()); - metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize()); - messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); - tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); - tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); - - if (manager.getSettings().isSignMessages()){ - File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId()); - File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub"); - if (!privateKey.exists()){ - throw new IllegalArgumentException("private key not found " + privateKey); - } - if (!publicKey.exists()){ - throw new IllegalArgumentException("public key not found " + publicKey); - } - try (FileInputStream keyfis = new FileInputStream(privateKey)) { - byte[] encKey = new byte[keyfis.available()]; - keyfis.read(encKey); - keyfis.close(); - privKeySpec = new PKCS8EncodedKeySpec(encKey); - KeyFactory keyFactory = KeyFactory.getInstance("DSA"); - privKey = keyFactory.generatePrivate(privKeySpec); - } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { - throw new RuntimeException("failed hard", e); - } - } else { - privKeySpec = null; - privKey = null; - } - } - - private byte [] sign(byte [] bytes){ - Signature dsa; - try { - dsa = Signature.getInstance("SHA1withDSA", "SUN"); - dsa.initSign(privKey); - dsa.update(bytes); - return dsa.sign(); - } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void addSharedData(SharedDataMessage message) { - while (true){ - SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); - if (previous == null){ - return; - } - if (message.getPayload() instanceof Crdt){ - SharedDataMessage merged = new SharedDataMessage(); - merged.setExpireAt(message.getExpireAt()); - merged.setKey(message.getKey()); - merged.setNodeId(message.getNodeId()); - merged.setTimestamp(message.getTimestamp()); - Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload()); - merged.setPayload(mergedCrdt); - boolean replaced = sharedData.replace(message.getKey(), previous, merged); - if (replaced){ - return; - } - } else { - if (previous.getTimestamp() < message.getTimestamp()){ - boolean result = sharedData.replace(message.getKey(), previous, message); - if (result){ - return; - } - } else { - return; - } - } - } - } - - public void addPerNodeData(PerNodeDataMessage message){ - ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>(); - nodeMap.put(message.getKey(), message); - nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); - if (nodeMap != null){ - PerNodeDataMessage current = nodeMap.get(message.getKey()); - if (current == null){ - nodeMap.putIfAbsent(message.getKey(), message); - } else { - if (current.getTimestamp() < message.getTimestamp()){ - nodeMap.replace(message.getKey(), current, message); - } - } - } - } - - public ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> getPerNodeData(){ - return perNodeData; - } - - public ConcurrentHashMap<String, SharedDataMessage> getSharedData() { - return sharedData; - } - - public void shutdown(){ - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn(e); - } - service.shutdownNow(); - } - - public void receive(Base base) { - if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) { - LOGGER.warn("received message can not be handled"); - } - } - - /** - * Sends a blocking message. - * @param message - * @param uri - * @throws RuntimeException if data can not be serialized or in transmission error - */ - private void sendInternal(Base message, URI uri){ - byte[] json_bytes; - try { - if (privKey == null){ - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); - } else { - SignedPayload p = new SignedPayload(); - p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); - p.setSignature(sign(p.getData())); - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); - } - } catch (IOException e) { - messageSerdeException.mark(); - throw new RuntimeException(e); - } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(uri.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort()); - socket.send(datagramPacket); - tranmissionSuccess.mark(); - } catch (IOException e) { - tranmissionException.mark(); - throw new RuntimeException(e); - } - } - - public Response send(Base message, URI uri){ - if (LOGGER.isDebugEnabled()){ - LOGGER.debug("Sending " + message); - LOGGER.debug("Current request queue " + requests); - } - - final Trackable t; - LatchAndBase latchAndBase = null; - if (message instanceof Trackable){ - t = (Trackable) message; - latchAndBase = new LatchAndBase(); - requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase); - } else { - t = null; - } - sendInternal(message, uri); - if (latchAndBase == null){ - return null; - } - - try { - boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS); - if (complete){ - return (Response) latchAndBase.base; - } else{ - return null; - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - if (latchAndBase != null){ - requests.remove(t.getUuid() + "/" + t.getUriFrom()); - } - } - } - - /** - * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used - * when the protocol for the message is not to wait for a response - * @param message the message to send - * @param u the uri to send it to - */ - public void sendOneWay(Base message, URI u){ - byte[] json_bytes; - try { - if (privKey == null){ - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); - } else { - SignedPayload p = new SignedPayload(); - p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); - p.setSignature(sign(p.getData())); - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); - } - } catch (IOException e) { - messageSerdeException.mark(); - throw new RuntimeException(e); - } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(u.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort()); - socket.send(datagramPacket); - tranmissionSuccess.mark(); - } catch (IOException ex) { - tranmissionException.mark(); - LOGGER.debug("Send one way failed", ex); - } - } - - public void handleResponse(String k, Base v) { - LatchAndBase latch = requests.get(k); - latch.base = v; - latch.latch.countDown(); - } - - /** - * Merge lists from remote members and update heartbeats - * - * @param gossipManager - * @param senderMember - * @param remoteList - * - */ - public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, - List<Member> remoteList) { - if (LOGGER.isDebugEnabled()){ - debugState(senderMember, remoteList); - } - for (LocalMember i : gossipManager.getDeadMembers()) { - if (i.getId().equals(senderMember.getId())) { - LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); - i.recordHeartbeat(senderMember.getHeartbeat()); - i.setHeartbeat(senderMember.getHeartbeat()); - //TODO consider forcing an UP here - } - } - for (Member remoteMember : remoteList) { - if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { - continue; - } - LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(), - remoteMember.getUri(), - remoteMember.getId(), - remoteMember.getHeartbeat(), - remoteMember.getProperties(), - gossipManager.getSettings().getWindowSize(), - gossipManager.getSettings().getMinimumSamples(), - gossipManager.getSettings().getDistribution()); - aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); - Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); - if (result != null){ - for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){ - if (localMember.getKey().getId().equals(remoteMember.getId())){ - localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); - localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); - localMember.getKey().setProperties(remoteMember.getProperties()); - } - } - } - } - if (LOGGER.isDebugEnabled()){ - debugState(senderMember, remoteList); - } - } - - private void debugState(RemoteMember senderMember, - List<Member> remoteList){ - LOGGER.warn( - "-----------------------\n" + - "Me " + gossipManager.getMyself() + "\n" + - "Sender " + senderMember + "\n" + - "RemoteList " + remoteList + "\n" + - "Live " + gossipManager.getLiveMembers()+ "\n" + - "Dead " + gossipManager.getDeadMembers()+ "\n" + - "======================="); - } - - @SuppressWarnings("rawtypes") - public Crdt merge(SharedDataMessage message) { - for (;;){ - SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); - if (previous == null){ - return (Crdt) message.getPayload(); - } - SharedDataMessage copy = new SharedDataMessage(); - copy.setExpireAt(message.getExpireAt()); - copy.setKey(message.getKey()); - copy.setNodeId(message.getNodeId()); - copy.setTimestamp(message.getTimestamp()); - @SuppressWarnings("unchecked") - Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload()); - copy.setPayload(merged); - boolean replaced = sharedData.replace(message.getKey(), previous, copy); - if (replaced){ - return merged; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java deleted file mode 100644 index 6d3765a..0000000 --- a/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -public interface GossipCoreConstants { - String WORKQUEUE_SIZE = "gossip.core.workqueue.size"; - String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; - String SHARED_DATA_SIZE = "gossip.core.shareddata.size"; - String REQUEST_SIZE = "gossip.core.requests.size"; - String THREADPOOL_ACTIVE = "gossip.core.threadpool.active"; - String THREADPOOL_SIZE = "gossip.core.threadpool.size"; - String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception"; - String MESSAGE_TRANSMISSION_EXCEPTION = "gossip.core.message_transmission_exception"; - String MESSAGE_TRANSMISSION_SUCCESS = "gossip.core.message_transmission_success"; -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java deleted file mode 100644 index c2b50ae..0000000 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.LocalMember; -import org.apache.gossip.Member; -import org.apache.gossip.crdt.Crdt; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.event.GossipState; -import org.apache.gossip.manager.handlers.MessageInvoker; -import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; -import org.apache.gossip.model.PerNodeDataMessage; -import org.apache.gossip.model.SharedDataMessage; -import org.apache.log4j.Logger; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.URI; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -public abstract class GossipManager { - - public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - - private final ConcurrentSkipListMap<LocalMember, GossipState> members; - private final LocalMember me; - private final GossipSettings settings; - private final AtomicBoolean gossipServiceRunning; - private AbstractActiveGossiper activeGossipThread; - private PassiveGossipThread passiveGossipThread; - private ExecutorService gossipThreadExecutor; - private final GossipCore gossipCore; - private final DataReaper dataReaper; - private final Clock clock; - private final ScheduledExecutorService scheduledServiced; - private final MetricRegistry registry; - private final RingStatePersister ringState; - private final UserDataPersister userDataState; - private final GossipMemberStateRefresher memberStateRefresher; - private final ObjectMapper objectMapper; - - private final MessageInvoker messageInvoker; - - public GossipManager(String cluster, - URI uri, String id, Map<String, String> properties, GossipSettings settings, - List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, - ObjectMapper objectMapper, MessageInvoker messageInvoker) { - this.settings = settings; - this.messageInvoker = messageInvoker; - - clock = new SystemClock(); - me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, - settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); - gossipCore = new GossipCore(this, registry); - dataReaper = new DataReaper(gossipCore, clock); - members = new ConcurrentSkipListMap<>(); - for (Member startupMember : gossipMembers) { - if (!startupMember.equals(me)) { - LocalMember member = new LocalMember(startupMember.getClusterName(), - startupMember.getUri(), startupMember.getId(), - clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), - settings.getMinimumSamples(), settings.getDistribution()); - //TODO should members start in down state? - members.put(member, GossipState.DOWN); - } - } - gossipThreadExecutor = Executors.newCachedThreadPool(); - gossipServiceRunning = new AtomicBoolean(true); - this.scheduledServiced = Executors.newScheduledThreadPool(1); - this.registry = registry; - this.ringState = new RingStatePersister(this); - this.userDataState = new UserDataPersister(this, this.gossipCore); - this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData); - this.objectMapper = objectMapper; - readSavedRingState(); - readSavedDataState(); - } - - public MessageInvoker getMessageInvoker() { - return messageInvoker; - } - - public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() { - return members; - } - - public GossipSettings getSettings() { - return settings; - } - - /** - * @return a read only list of members found in the DOWN state. - */ - public List<LocalMember> getDeadMembers() { - return Collections.unmodifiableList( - members.entrySet() - .stream() - .filter(entry -> GossipState.DOWN.equals(entry.getValue())) - .map(Entry::getKey).collect(Collectors.toList())); - } - - /** - * - * @return a read only list of members found in the UP state - */ - public List<LocalMember> getLiveMembers() { - return Collections.unmodifiableList( - members.entrySet() - .stream() - .filter(entry -> GossipState.UP.equals(entry.getValue())) - .map(Entry::getKey).collect(Collectors.toList())); - } - - public LocalMember getMyself() { - return me; - } - - private AbstractActiveGossiper constructActiveGossiper(){ - try { - Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class); - return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry); - } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - /** - * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip - * thread and start the receiver thread. - */ - public void init() { - passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); - gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = constructActiveGossiper(); - activeGossipThread.init(); - dataReaper.init(); - scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); - scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); - scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS); - LOGGER.debug("The GossipManager is started."); - } - - private void readSavedRingState() { - for (LocalMember l : ringState.readFromDisk()){ - LocalMember member = new LocalMember(l.getClusterName(), - l.getUri(), l.getId(), - clock.nanoTime(), l.getProperties(), settings.getWindowSize(), - settings.getMinimumSamples(), settings.getDistribution()); - members.putIfAbsent(member, GossipState.DOWN); - } - } - - private void readSavedDataState() { - for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){ - for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){ - gossipCore.addPerNodeData(j.getValue()); - } - } - for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){ - gossipCore.addSharedData(l.getValue()); - } - } - - /** - * Shutdown the gossip service. - */ - public void shutdown() { - gossipServiceRunning.set(false); - gossipThreadExecutor.shutdown(); - gossipCore.shutdown(); - dataReaper.close(); - if (passiveGossipThread != null) { - passiveGossipThread.shutdown(); - } - if (activeGossipThread != null) { - activeGossipThread.shutdown(); - } - try { - boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); - if (!result) { - LOGGER.error("executor shutdown timed out"); - } - } catch (InterruptedException e) { - LOGGER.error(e); - } - gossipThreadExecutor.shutdownNow(); - scheduledServiced.shutdown(); - try { - scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.error(e); - } - scheduledServiced.shutdownNow(); - } - - public void gossipPerNodeData(PerNodeDataMessage message){ - Objects.nonNull(message.getKey()); - Objects.nonNull(message.getTimestamp()); - Objects.nonNull(message.getPayload()); - message.setNodeId(me.getId()); - gossipCore.addPerNodeData(message); - } - - public void gossipSharedData(SharedDataMessage message){ - Objects.nonNull(message.getKey()); - Objects.nonNull(message.getTimestamp()); - Objects.nonNull(message.getPayload()); - message.setNodeId(me.getId()); - gossipCore.addSharedData(message); - } - - - @SuppressWarnings("rawtypes") - public Crdt findCrdt(String key){ - SharedDataMessage l = gossipCore.getSharedData().get(key); - if (l == null){ - return null; - } - if (l.getExpireAt() < clock.currentTimeMillis()){ - return null; - } else { - return (Crdt) l.getPayload(); - } - } - - @SuppressWarnings("rawtypes") - public Crdt merge(SharedDataMessage message){ - Objects.nonNull(message.getKey()); - Objects.nonNull(message.getTimestamp()); - Objects.nonNull(message.getPayload()); - message.setNodeId(me.getId()); - if (! (message.getPayload() instanceof Crdt)){ - throw new IllegalArgumentException("Not a subclass of CRDT " + message.getPayload()); - } - return gossipCore.merge(message); - } - - public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){ - ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId); - if (j == null){ - return null; - } else { - PerNodeDataMessage l = j.get(key); - if (l == null){ - return null; - } - if (l.getExpireAt() != null && l.getExpireAt() < clock.currentTimeMillis()) { - return null; - } - return l; - } - } - - public SharedDataMessage findSharedGossipData(String key){ - SharedDataMessage l = gossipCore.getSharedData().get(key); - if (l == null){ - return null; - } - if (l.getExpireAt() < clock.currentTimeMillis()){ - return null; - } else { - return l; - } - } - - public DataReaper getDataReaper() { - return dataReaper; - } - - public RingStatePersister getRingState() { - return ringState; - } - - public UserDataPersister getUserDataState() { - return userDataState; - } - - public GossipMemberStateRefresher getMemberStateRefresher() { - return memberStateRefresher; - } - - public Clock getClock() { - return clock; - } - - public ObjectMapper getObjectMapper() { - return objectMapper; - } - - public MetricRegistry getRegistry() { - return registry; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java deleted file mode 100644 index b87045b..0000000 --- a/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.core.JsonGenerator.Feature; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.Member; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.StartupSettings; -import org.apache.gossip.crdt.CrdtModule; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.manager.handlers.DefaultMessageInvoker; -import org.apache.gossip.manager.handlers.MessageInvoker; - -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class GossipManagerBuilder { - - public static ManagerBuilder newBuilder() { - return new ManagerBuilder(); - } - - public static final class ManagerBuilder { - private String cluster; - private URI uri; - private String id; - private GossipSettings settings; - private List<Member> gossipMembers; - private GossipListener listener; - private MetricRegistry registry; - private Map<String,String> properties; - private ObjectMapper objectMapper; - private MessageInvoker messageInvoker; - - private ManagerBuilder() {} - - private void checkArgument(boolean check, String msg) { - if (!check) { - throw new IllegalArgumentException(msg); - } - } - - public ManagerBuilder cluster(String cluster) { - this.cluster = cluster; - return this; - } - - public ManagerBuilder properties(Map<String,String> properties) { - this.properties = properties; - return this; - } - - public ManagerBuilder id(String id) { - this.id = id; - return this; - } - - public ManagerBuilder gossipSettings(GossipSettings settings) { - this.settings = settings; - return this; - } - - public ManagerBuilder startupSettings(StartupSettings startupSettings) { - this.cluster = startupSettings.getCluster(); - this.id = startupSettings.getId(); - this.settings = startupSettings.getGossipSettings(); - this.gossipMembers = startupSettings.getGossipMembers(); - this.uri = startupSettings.getUri(); - return this; - } - - public ManagerBuilder gossipMembers(List<Member> members) { - this.gossipMembers = members; - return this; - } - - public ManagerBuilder listener(GossipListener listener) { - this.listener = listener; - return this; - } - - public ManagerBuilder registry(MetricRegistry registry) { - this.registry = registry; - return this; - } - - public ManagerBuilder uri(URI uri){ - this.uri = uri; - return this; - } - - public ManagerBuilder mapper(ObjectMapper objectMapper){ - this.objectMapper = objectMapper; - return this; - } - - public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { - this.messageInvoker = messageInvoker; - return this; - } - - public GossipManager build() { - checkArgument(id != null, "You must specify an id"); - checkArgument(cluster != null, "You must specify a cluster name"); - checkArgument(settings != null, "You must specify gossip settings"); - checkArgument(uri != null, "You must specify a uri"); - if (registry == null){ - registry = new MetricRegistry(); - } - if (properties == null){ - properties = new HashMap<String,String>(); - } - if (listener == null){ - listener((a,b) -> {}); - } - if (gossipMembers == null) { - gossipMembers = new ArrayList<>(); - } - if (objectMapper == null) { - objectMapper = new ObjectMapper(); - objectMapper.enableDefaultTyping(); - objectMapper.registerModule(new CrdtModule()); - objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); - } - if (messageInvoker == null) { - messageInvoker = new DefaultMessageInvoker(); - } - return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; - } - } - -}