Repository: incubator-gossip Updated Branches: refs/heads/master 19662d150 -> 2622248e7
GOSSIP-63 Added CRDT G-Counter implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/da1aba9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/da1aba9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/da1aba9c Branch: refs/heads/master Commit: da1aba9c71a8e26dcf505482a35fb605d9de3977 Parents: 22b9e75 Author: Mirage Abeysekara <mirage...@cse.mrt.ac.lk> Authored: Wed Mar 8 10:21:03 2017 +0530 Committer: Mirage Abeysekara <mirage...@cse.mrt.ac.lk> Committed: Thu Mar 9 13:44:49 2017 +0530 ---------------------------------------------------------------------- .../org/apache/gossip/crdt/CrdtCounter.java | 24 ++++ .../java/org/apache/gossip/crdt/CrdtModule.java | 7 ++ .../org/apache/gossip/crdt/GrowOnlyCounter.java | 119 +++++++++++++++++++ src/test/java/org/apache/gossip/DataTest.java | 53 ++++++++- .../apache/gossip/crdt/GrowOnlyCounterTest.java | 54 +++++++++ 5 files changed, 254 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/da1aba9c/src/main/java/org/apache/gossip/crdt/CrdtCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/crdt/CrdtCounter.java b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java new file mode 100644 index 0000000..cdc9445 --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +public interface CrdtCounter<ValueType extends Number, R extends CrdtCounter<ValueType, R>> + extends Crdt<ValueType, R> { + +} + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/da1aba9c/src/main/java/org/apache/gossip/crdt/CrdtModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/src/main/java/org/apache/gossip/crdt/CrdtModule.java index 0c8a787..cfb3f47 100644 --- a/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -42,6 +42,12 @@ abstract class GrowOnlySetMixin<E>{ @JsonIgnore abstract boolean isEmpty(); } +abstract class GrowOnlyCounterMixin { + @JsonCreator + GrowOnlyCounterMixin(@JsonProperty("counters") Map<String, Long> counters) { } + @JsonProperty("counters") abstract Map<String, Long> getCounters(); +} + //If anyone wants to take a stab at this. please have at it //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java public class CrdtModule extends SimpleModule { @@ -56,6 +62,7 @@ public class CrdtModule extends SimpleModule { public void setupModule(SetupContext context) { context.setMixInAnnotations(OrSet.class, OrSetMixin.class); context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); + context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/da1aba9c/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java new file mode 100644 index 0000000..9156142 --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.GossipManager; + +import java.util.HashMap; +import java.util.Map; + +public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> { + + private final Map<String, Long> counters = new HashMap<>(); + + GrowOnlyCounter(Map<String, Long> counters) { + this.counters.putAll(counters); + } + + public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) { + counters.putAll(growOnlyCounter.counters); + if (counters.containsKey(builder.myId)) { + Long newValue = counters.get(builder.myId) + builder.counter; + counters.replace(builder.myId, newValue); + } else { + counters.put(builder.myId, builder.counter); + } + } + + public GrowOnlyCounter(Builder builder) { + counters.put(builder.myId, builder.counter); + } + + public GrowOnlyCounter(GossipManager manager) { + counters.put(manager.getMyself().getId(), 0L); + } + + public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) { + counters.putAll(growOnlyCounter.counters); + for (Map.Entry<String, Long> entry : other.counters.entrySet()) { + String otherKey = entry.getKey(); + Long otherValue = entry.getValue(); + + if (counters.containsKey(otherKey)) { + Long newValue = Math.max(counters.get(otherKey), otherValue); + counters.replace(otherKey, newValue); + } else { + counters.put(otherKey, otherValue); + } + } + } + + @Override + public GrowOnlyCounter merge(GrowOnlyCounter other) { + return new GrowOnlyCounter(this, other); + } + + @Override + public Long value() { + Long globalCount = 0L; + for (Long increment : counters.values()) { + globalCount += increment; + } + return globalCount; + } + + @Override + public GrowOnlyCounter optimize() { + return new GrowOnlyCounter(counters); + } + + @Override + public boolean equals(Object obj) { + if (getClass() != obj.getClass()) + return false; + GrowOnlyCounter other = (GrowOnlyCounter) obj; + return value().longValue() == other.value().longValue(); + } + + @Override + public String toString() { + return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]"; + } + + Map<String, Long> getCounters() { + return counters; + } + + public static class Builder { + + private final String myId; + + private Long counter; + + public Builder(GossipManager gossipManager) { + myId = gossipManager.getMyself().getId(); + counter = 0L; + } + + public GrowOnlyCounter.Builder increment(Integer count) { + counter += count; + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/da1aba9c/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 3892e9b..a8d57a7 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.gossip.crdt.GrowOnlyCounter; import org.apache.gossip.crdt.GrowOnlySet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.model.GossipDataMessage; @@ -39,6 +40,7 @@ import io.teknek.tunit.TUnit; public class DataTest { private String orSetKey = "cror"; + private String gCounterKey = "crdtgc"; @Test public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ @@ -72,7 +74,7 @@ public class DataTest { clients.get(0).gossipPerNodeData(msg()); clients.get(0).gossipSharedData(sharedMsg()); - TUnit.assertThat(()-> { + TUnit.assertThat(()-> { GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); if (x == null) return ""; @@ -80,7 +82,7 @@ public class DataTest { return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - TUnit.assertThat(() -> { + TUnit.assertThat(() -> { SharedGossipDataMessage x = clients.get(1).findSharedData("a"); if (x == null) return ""; @@ -96,11 +98,49 @@ public class DataTest { dropIt(clients); assertThatOrSetDelIsMerged(clients); + // test g counter + givenDifferentIncrement(clients); + assertThatCountIsUpdated(clients, 3); + givenIncreaseOther(clients); + assertThatCountIsUpdated(clients, 7); + for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); } } + private void givenDifferentIncrement(final List<GossipService> clients) { + { + SharedGossipDataMessage d = new SharedGossipDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(1))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).getGossipManager().merge(d); + } + { + SharedGossipDataMessage d = new SharedGossipDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(2))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).getGossipManager().merge(d); + } + } + + private void givenIncreaseOther(final List<GossipService> clients) { + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).getGossipManager().findCrdt(gCounterKey); + GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(4)); + + SharedGossipDataMessage d = new SharedGossipDataMessage(); + d.setKey(gCounterKey); + d.setPayload(gc2); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).getGossipManager().merge(d); + } + private void givenOrs(List<GossipService> clients) { { SharedGossipDataMessage d = new SharedGossipDataMessage(); @@ -152,6 +192,13 @@ public class DataTest { clients.get(1).getGossipManager().merge(CrdtMessage("2")); } + private void assertThatCountIsUpdated(final List<GossipService> clients, int finalCount) { + TUnit.assertThat(() -> { + return clients.get(0).getGossipManager().findCrdt(gCounterKey); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( + new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(finalCount))); + } + private void assertThatListIsMerged(final List<GossipService> clients){ TUnit.assertThat(() -> { return clients.get(0).getGossipManager().findCrdt("cr"); @@ -164,7 +211,7 @@ public class DataTest { d.setPayload(new GrowOnlySet<String>( Arrays.asList(item))); d.setExpireAt(Long.MAX_VALUE); d.setTimestamp(System.currentTimeMillis()); - return d; + return d; } private GossipDataMessage msg(){ http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/da1aba9c/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java new file mode 100644 index 0000000..3a134af --- /dev/null +++ b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class GrowOnlyCounterTest { + + @Test + public void mergeTest() { + + Map<String, Long> node1Counter = new HashMap<>(); + node1Counter.put("1", 3L); + Map<String, Long> node2Counter = new HashMap<>(); + node2Counter.put("2", 1L); + Map<String, Long> node3Counter = new HashMap<>(); + node3Counter.put("3", 2L); + + GrowOnlyCounter gCounter1 = new GrowOnlyCounter(node1Counter); + GrowOnlyCounter gCounter2 = new GrowOnlyCounter(node2Counter); + GrowOnlyCounter gCounter3 = new GrowOnlyCounter(node3Counter); + + // After node 2 receive from node 1 + gCounter2 = gCounter2.merge(gCounter1); + Assert.assertEquals(4, (long) gCounter2.value()); + + // After node 3 receive from node 1 + gCounter3 = gCounter3.merge(gCounter1); + Assert.assertEquals(5, (long) gCounter3.value()); + + // After node 3 receive from node 2 + gCounter3 = gCounter3.merge(gCounter2); + Assert.assertEquals(6, (long) gCounter3.value()); + } +}