Repository: incubator-gossip Updated Branches: refs/heads/master 9c9d96e56 -> 602a79bfc
GOSSIP-65 Implement crdt LWW-Element-Set LWWSet implemented + se/de + unit tests + jackson tests + DataTests Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/602a79bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/602a79bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/602a79bf Branch: refs/heads/master Commit: 602a79bfc6ff964c8a8905fab3824e6f3203838d Parents: 9c9d96e Author: Maxim Rusak <[email protected]> Authored: Mon Jun 19 22:56:36 2017 +0300 Committer: Maxim Rusak <[email protected]> Committed: Mon Jun 19 22:56:36 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/gossip/crdt/CrdtModule.java | 15 ++ .../java/org/apache/gossip/crdt/LWWSet.java | 152 ++++++++++++ .../java/org/apache/gossip/crdt/LWWSetTest.java | 155 ++++++++++++ .../test/java/org/apache/gossip/DataTest.java | 233 ++++++++----------- .../gossip/protocol/json/JacksonTest.java | 17 ++ 5 files changed, 440 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/602a79bf/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index cfb3f47..bb1a052 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -35,6 +35,19 @@ abstract class OrSetMixin<E> { @JsonIgnore abstract boolean isEmpty(); } +abstract class LWWSetMixin<ElementType> { + @JsonCreator + LWWSetMixin(@JsonProperty("data") Map<ElementType, LWWSet.Timestamps> struct) { } + @JsonProperty("data") abstract Map<ElementType, LWWSet.Timestamps> getStruct(); +} + +abstract class LWWSetTimestampsMixin { + @JsonCreator + LWWSetTimestampsMixin(@JsonProperty("add") long latestAdd, @JsonProperty("remove") long latestRemove) { } + @JsonProperty("add") abstract long getLatestAdd(); + @JsonProperty("remove") abstract long getLatestRemove(); +} + abstract class GrowOnlySetMixin<E>{ @JsonCreator GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ } @@ -63,6 +76,8 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(OrSet.class, OrSetMixin.class); context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); + context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class); + context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/602a79bf/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java new file mode 100644 index 0000000..b51ce7a --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.Clock; +import org.apache.gossip.manager.SystemClock; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LWWSet<ElementType> implements CrdtSet<ElementType, Set<ElementType>, LWWSet<ElementType>> { + static private Clock clock = new SystemClock(); + + private final Map<ElementType, Timestamps> struct; + + static class Timestamps { + private final long latestAdd; + private final long latestRemove; + + Timestamps(){ + latestAdd = 0; + latestRemove = 0; + } + + Timestamps(long add, long remove){ + latestAdd = add; + latestRemove = remove; + } + + long getLatestAdd() { + return latestAdd; + } + + long getLatestRemove() { + return latestRemove; + } + + // consider element present when addTime >= removeTime, so we prefer add to remove + boolean isPresent(){ + return latestAdd >= latestRemove; + } + + Timestamps updateAdd(){ + return new Timestamps(clock.nanoTime(), latestRemove); + } + + Timestamps updateRemove(){ + return new Timestamps(latestAdd, clock.nanoTime()); + } + + Timestamps merge(Timestamps other){ + if (other == null){ + return this; + } + return new Timestamps(Math.max(latestAdd, other.latestAdd), Math.max(latestRemove, other.latestRemove)); + } + } + + + public LWWSet(){ + struct = new HashMap<>(); + } + + @SafeVarargs + public LWWSet(ElementType... elements){ + this(new HashSet<>(Arrays.asList(elements))); + } + + public LWWSet(Set<ElementType> set){ + struct = new HashMap<>(); + for (ElementType e : set){ + struct.put(e, new Timestamps().updateAdd()); + } + } + + public LWWSet(LWWSet<ElementType> first, LWWSet<ElementType> second){ + Function<ElementType, Timestamps> timestampsFor = p -> { + Timestamps firstTs = first.struct.get(p); + Timestamps secondTs = second.struct.get(p); + if (firstTs == null){ + return secondTs; + } + return firstTs.merge(secondTs); + }; + struct = Stream.concat(first.struct.keySet().stream(), second.struct.keySet().stream()) + .distinct().collect(Collectors.toMap(p -> p, timestampsFor)); + } + + public LWWSet<ElementType> add(ElementType e){ + return this.merge(new LWWSet<>(e)); + } + + // for serialization + LWWSet(Map<ElementType, Timestamps> struct){ + this.struct = struct; + } + + Map<ElementType, Timestamps> getStruct() { + return struct; + } + + + public LWWSet<ElementType> remove(ElementType e){ + Timestamps eTimestamps = struct.get(e); + if (eTimestamps == null || !eTimestamps.isPresent()){ + return this; + } + Map<ElementType, Timestamps> changeMap = new HashMap<>(); + changeMap.put(e, eTimestamps.updateRemove()); + return this.merge(new LWWSet<>(changeMap)); + } + + @Override + public LWWSet<ElementType> merge(LWWSet<ElementType> other){ + return new LWWSet<>(this, other); + } + + @Override + public Set<ElementType> value(){ + return struct.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public LWWSet<ElementType> optimize(){ + return this; + } + + @Override + public boolean equals(Object obj){ + return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((LWWSet) obj).value())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/602a79bf/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java new file mode 100644 index 0000000..bdd3258 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.Clock; +import org.apache.gossip.manager.SystemClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LWWSetTest { + static private Clock clock = new SystemClock(); + private Set<Integer> sampleSet; + + @Before + public void setup(){ + sampleSet = new HashSet<>(); + sampleSet.add(4); + sampleSet.add(5); + sampleSet.add(12); + } + + @Test + public void setConstructorTest(){ + Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet); + } + + @Test + public void stressWithSetTest(){ + Set<Integer> set = new HashSet<>(); + LWWSet<Integer> lww = new LWWSet<>(); + for (int it = 0; it < 100; it++){ + LWWSet<Integer> newLww; + if (it % 5 == 1){ + //deleting existing + Integer forDelete = set.stream().skip((long) (set.size() * Math.random())).findFirst().get(); + newLww = lww.remove(forDelete); + Assert.assertEquals(lww.value(), set); // check old version is immutable + set.remove(forDelete); + } else { + //adding + Integer forAdd = (int) (10000 * Math.random()); + newLww = lww.add(forAdd); + Assert.assertEquals(lww.value(), set); // check old version is immutable + set.add(forAdd); + } + lww = newLww; + Assert.assertEquals(lww.value(), set); + } + } + + @Test + public void equalsTest(){ + LWWSet<Integer> lww = new LWWSet<>(sampleSet); + Assert.assertFalse(lww.equals(sampleSet)); + LWWSet<Integer> newLww = lww.add(25); + sampleSet.add(25); + Assert.assertFalse(newLww.equals(lww)); + Assert.assertEquals(new LWWSet<>(sampleSet), newLww); + } + + @Test + public void valueTest() { + Map<Character, LWWSet.Timestamps> map = new HashMap<>(); + map.put('a', new LWWSet.Timestamps(1, 0)); + map.put('b', new LWWSet.Timestamps(1, 2)); + map.put('c', new LWWSet.Timestamps(3, 3)); + Set<Character> toTest = new HashSet<>(); + toTest.add('a'); // for 'a' addTime > removeTime + toTest.add('c'); // for 'c' times are equal, we prefer add to remove + Assert.assertEquals(new LWWSet<>(map).value(), toTest); + Assert.assertEquals(new LWWSet<>(map), new LWWSet<>('a', 'c')); + } + + @Test + public void removeMissingTest(){ + LWWSet<Integer> lww = new LWWSet<>(sampleSet); + lww = lww.add(25); + lww = lww.remove(25); + Assert.assertEquals(lww.value(), sampleSet); + lww = lww.remove(25); + lww = lww.add(25); + sampleSet.add(25); + Assert.assertEquals(lww.value(), sampleSet); + } + + @Test + public void stressMergeTest(){ + // in one-process context, add, remove and merge operations of lww are equal to operations of Set + // we've already checked it. Now just check merge + Set<Integer> set1 = new HashSet<>(), set2 = new HashSet<>(); + LWWSet<Integer> lww1 = new LWWSet<>(), lww2 = new LWWSet<>(); + + for (int it = 0; it < 100; it++){ + Integer forAdd = (int) (10000 * Math.random()); + if (it % 2 == 0){ + set1.add(forAdd); + lww1 = lww1.add(forAdd); + } else { + set2.add(forAdd); + lww2 = lww2.add(forAdd); + } + } + Assert.assertEquals(lww1.value(), set1); + Assert.assertEquals(lww2.value(), set2); + Set<Integer> mergedSet = Stream.concat(set1.stream(), set2.stream()).collect(Collectors.toSet()); + Assert.assertEquals(lww1.merge(lww2).value(), mergedSet); + } + + @Test + public void fakeTimeMergeTest(){ + // try to create LWWSet with time from future (simulate other process with its own clock) and validate result + // check remove from the future + Map<Integer, LWWSet.Timestamps> map = new HashMap<>(); + map.put(25, new LWWSet.Timestamps(clock.nanoTime(), clock.nanoTime() + 100000)); + LWWSet<Integer> lww = new LWWSet<>(map); + Assert.assertEquals(lww, new LWWSet<Integer>()); + //create new LWWSet with element 25, and merge with other LWW which has remove in future + Assert.assertEquals(new LWWSet<>(25).merge(lww), new LWWSet<Integer>()); + + // add in future + map.put(25, new LWWSet.Timestamps(clock.nanoTime() + 100000, 0)); + lww = new LWWSet<>(map); + lww = lww.remove(25); + Assert.assertEquals(lww, new LWWSet<>(25)); // 25 is still here + } + + @Test + public void optimizeTest(){ + Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet); + Assert.assertEquals(new LWWSet<>(sampleSet).optimize().value(), sampleSet); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/602a79bf/gossip-itest/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java index 9fe9aa9..53408f8 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -17,222 +17,191 @@ */ package org.apache.gossip; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import io.teknek.tunit.TUnit; import org.apache.gossip.crdt.GrowOnlyCounter; import org.apache.gossip.crdt.GrowOnlySet; +import org.apache.gossip.crdt.LWWSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; +import org.junit.Assert; import org.junit.Test; -import io.teknek.tunit.TUnit; - +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.TimeUnit; + public class DataTest extends AbstractIntegrationBase { - + private String orSetKey = "cror"; + private String lwwSetKey = "crlww"; private String gCounterKey = "crdtgc"; - + @Test public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); - settings.setPersistRingState(false); + settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List<Member> startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes+1; ++i) { + for (int i = 1; i < seedNodes + 1; ++i){ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); startupMembers.add(new RemoteMember(cluster, uri, i + "")); } final List<GossipManager> clients = new ArrayList<>(); final int clusterMembers = 2; - for (int i = 1; i < clusterMembers + 1; ++i) { + for (int i = 1; i < clusterMembers + 1; ++i){ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) - .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); gossipService.init(); register(gossipService); } TUnit.assertThat(() -> { int total = 0; - for (int i = 0; i < clusterMembers; ++i) { + for (int i = 0; i < clusterMembers; ++i){ total += clients.get(i).getLiveMembers().size(); } return total; }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); - clients.get(0).gossipPerNodeData(msg()); - clients.get(0).gossipSharedData(sharedMsg()); + clients.get(0).gossipPerNodeData(generatePerNodeMsg("a", "b")); + clients.get(0).gossipSharedData(generateSharedMsg("a", "c")); - TUnit.assertThat(()-> { + TUnit.assertThat(() -> { PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a"); if (x == null) return ""; else return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - - TUnit.assertThat(() -> { + + TUnit.assertThat(() -> { SharedDataMessage x = clients.get(1).findSharedGossipData("a"); if (x == null) return ""; else return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); - - + + givenDifferentDatumsInSet(clients); assertThatListIsMerged(clients); - - givenOrs(clients); - assertThatOrSetIsMerged(clients); - dropIt(clients); - assertThatOrSetDelIsMerged(clients); - + testOrSet(clients); + testLWWSet(clients); + // test g counter givenDifferentIncrement(clients); assertThatCountIsUpdated(clients, 3); givenIncreaseOther(clients); assertThatCountIsUpdated(clients, 7); - for (int i = 0; i < clusterMembers; ++i) { + for (int i = 0; i < clusterMembers; ++i){ clients.get(i).shutdown(); } } - - private void givenDifferentIncrement(final List<GossipManager> clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } - private void givenIncreaseOther(final List<GossipManager> clients) { - GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); - GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, - new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); + private void testOrSet(final List<GossipManager> clients){ + //populate + clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2"))); + clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4"))); - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(gc2); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } + //assert merge + assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value()); + assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value()); - private void givenOrs(List<GossipManager> clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(new OrSet<String>("1", "2")); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(new OrSet<String>("3", "4")); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } - - private void dropIt(List<GossipManager> clients) { + //drop element @SuppressWarnings("unchecked") OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey); - OrSet<String> o2 = new OrSet<String>(o, new OrSet.Builder<String>().remove("3")); - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(o2); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); + OrSet<String> o2 = new OrSet<>(o, new OrSet.Builder<String>().remove("3")); + clients.get(0).merge(generateSharedMsg(orSetKey, o2)); + + //assert deletion + assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value()); + assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value()); + } + + private void testLWWSet(final List<GossipManager> clients){ + //populate + clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2"))); + clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4"))); + + //assert merge + assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); + assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); + + //drop element + @SuppressWarnings("unchecked") + LWWSet<String> lww = (LWWSet<String>) clients.get(0).findCrdt(lwwSetKey); + clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3"))); + + //assert deletion + assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value()); + assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value()); + } + + private void givenDifferentIncrement(final List<GossipManager> clients){ + Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)); + clients.get(0).merge(generateSharedMsg(gCounterKey, payload)); + payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L)); + clients.get(1).merge(generateSharedMsg(gCounterKey, payload)); } - - private void assertThatOrSetIsMerged(final List<GossipManager> clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(orSetKey).value(); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value()); - TUnit.assertThat(() -> { - return clients.get(1).findCrdt(orSetKey).value(); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value()); + + private void givenIncreaseOther(final List<GossipManager> clients){ + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); + GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); + + clients.get(1).merge(generateSharedMsg(gCounterKey, gc2)); } - - private void assertThatOrSetDelIsMerged(final List<GossipManager> clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(orSetKey); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet<String>("1", "2", "4")); + + private void assertMerged(final GossipManager client, String key, final Set<String> expected){ + TUnit.assertThat(() -> client.findCrdt(key).value()) + .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected); } private void givenDifferentDatumsInSet(final List<GossipManager> clients){ clients.get(0).merge(CrdtMessage("1")); clients.get(1).merge(CrdtMessage("2")); } - - private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount) { - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(gCounterKey); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( - new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); + + private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount){ + TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey)) + .afterWaitingAtMost(10, TimeUnit.SECONDS) + .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); } private void assertThatListIsMerged(final List<GossipManager> clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt("cr"); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<String>(Arrays.asList("1","2"))); + TUnit.assertThat(() -> clients.get(0).findCrdt("cr")) + .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2"))); } - + private SharedDataMessage CrdtMessage(String item){ - SharedDataMessage d = new SharedDataMessage(); - d.setKey("cr"); - d.setPayload(new GrowOnlySet<String>( Arrays.asList(item))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - return d; + return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item))); } - - private PerNodeDataMessage msg(){ + + private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){ PerNodeDataMessage g = new PerNodeDataMessage(); g.setExpireAt(Long.MAX_VALUE); - g.setKey("a"); - g.setPayload("b"); + g.setKey(key); + g.setPayload(payload); g.setTimestamp(System.currentTimeMillis()); return g; } - - private SharedDataMessage sharedMsg(){ - SharedDataMessage g = new SharedDataMessage(); - g.setExpireAt(Long.MAX_VALUE); - g.setKey("a"); - g.setPayload("c"); - g.setTimestamp(System.currentTimeMillis()); - return g; + + private SharedDataMessage generateSharedMsg(String key, Object payload){ + SharedDataMessage d = new SharedDataMessage(); + d.setKey(key); + d.setPayload(payload); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + return d; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/602a79bf/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java index cbac460..3c90ea1 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -22,6 +22,7 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.GossipSettings; import org.apache.gossip.Member; +import org.apache.gossip.crdt.LWWSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; @@ -80,6 +81,22 @@ public class JacksonTest { OrSet<Integer> back = objectMapper.readValue(s, OrSet.class); Assert.assertEquals(back, i); } + + @Test + public void jacksonCrdtLWWSetTest() { + ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings())); + + LWWSet<String> lww = new LWWSet<>("a", "b", "c"); + + try { + String lwwS = objectMapper.writeValueAsString(lww); + @SuppressWarnings("unchecked") + LWWSet<String> parsedLww = objectMapper.readValue(lwwS, LWWSet.class); + Assert.assertEquals(lww, parsedLww); + } catch (Exception e) { + Assert.fail("LWWSet se/de error"); + } + } @Test public void testMessageEqualityAssumptions() {
