Repository: incubator-gossip
Updated Branches:
  refs/heads/master 026b8bb48 -> 22b9e756d


GOSSIP-71 not merging correctly (egc & maxim)


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/22b9e756
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/22b9e756
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/22b9e756

Branch: refs/heads/master
Commit: 22b9e756d4b16f6108f563ca764934220be3e452
Parents: 026b8bb
Author: Edward Capriolo <[email protected]>
Authored: Mon Feb 27 23:54:43 2017 -0500
Committer: Edward Capriolo <[email protected]>
Committed: Wed Mar 1 14:27:34 2017 -0500

----------------------------------------------------------------------
 src/main/java/org/apache/gossip/crdt/OrSet.java | 79 ++++++++---------
 .../examples/StandAloneNodeCrdtOrSet.java       | 91 ++++++++++++++++++++
 .../org/apache/gossip/manager/GossipCore.java   | 40 +++++----
 .../apache/gossip/manager/GossipManager.java    |  1 +
 .../java/org/apache/gossip/crdt/OrSetTest.java  | 13 ++-
 5 files changed, 164 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/22b9e756/src/main/java/org/apache/gossip/crdt/OrSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java 
b/src/main/java/org/apache/gossip/crdt/OrSet.java
index 972377f..f84dbc7 100644
--- a/src/main/java/org/apache/gossip/crdt/OrSet.java
+++ b/src/main/java/org/apache/gossip/crdt/OrSet.java
@@ -17,16 +17,9 @@
  */
 package org.apache.gossip.crdt;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
+import java.util.function.BiConsumer;
 
 import org.apache.gossip.crdt.OrSet.Builder.Operation;
 
@@ -86,11 +79,34 @@ public class OrSet<E>  implements Crdt<Set<E>, OrSet<E>> {
     val = computeValue();
   }
 
+  static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) {
+    if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) {
+      return null;
+    }
+    Set<UUID> res = new HashSet<>(a);
+    res.addAll(b);
+    return res;
+  }
+
+  private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) 
{
+    if (value == null) {
+      return;
+    }
+    map.merge(key, value, OrSet::mergeSets);
+  }
+
   public OrSet(OrSet<E> left, OrSet<E> right){
-    elements.putAll(left.elements);
-    elements.putAll(right.elements);
-    tombstones.putAll(left.tombstones);
-    tombstones.putAll(right.tombstones);
+    BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, 
other) -> {
+      for (Entry<E, Set<UUID>> l : other.entrySet()){
+        internalSetMerge(items, l.getKey(), l.getValue());
+      }
+    };
+
+    internalMerge.accept(elements, left.elements);
+    internalMerge.accept(elements, right.elements);
+    internalMerge.accept(tombstones, left.tombstones);
+    internalMerge.accept(tombstones, right.tombstones);
+
     val = computeValue();
   }
   
@@ -103,29 +119,14 @@ public class OrSet<E>  implements Crdt<Set<E>, OrSet<E>> {
     return new OrSet<E>(this, other);
   }
   
-  private void internalAdd(E element){
-    Set<UUID> l = elements.get(element);
-    if (l == null){
-      Set<UUID> d = new HashSet<UUID>();
-      d.add(UUID.randomUUID());
-      elements.put(element, d);
-    } else {
-      l.add(UUID.randomUUID());
-    }
+  private void internalAdd(E element) {
+    Set<UUID> toMerge = new HashSet<>();
+    toMerge.add(UUID.randomUUID());
+    internalSetMerge(elements, element, toMerge);
   }
   
   private void internalRemove(E element){
-    Set<UUID> elementIds = elements.get(element);
-    if (elementIds == null){
-      //deleting elements not in the list
-      return;
-    }
-    Set<UUID> current = tombstones.get(element);
-    if (current != null){
-      current.addAll(elementIds);
-    } else {
-      tombstones.put(element, elementIds);
-    }
+    internalSetMerge(tombstones, element, elements.get(element));
   }
 
   /*
@@ -134,18 +135,10 @@ public class OrSet<E>  implements Crdt<Set<E>, OrSet<E>> {
   private Set<E> computeValue(){
     Set<E> values = new HashSet<>();
     for (Entry<E, Set<UUID>> entry: elements.entrySet()){
-      if (entry.getValue() == null || entry.getValue().size() == 0){
-        continue;
-      }
       Set<UUID> deleteIds = tombstones.get(entry.getKey());
-      if (deleteIds == null){
+      // if not all tokens for current element are in tombstones
+      if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) {
         values.add(entry.getKey());
-      } else {
-        if (!deleteIds.containsAll(entry.getValue())){
-          values.add(entry.getKey());
-        } else {
-          //if all the entry uuid is deleted the entry is deleted
-        }
       }
     }
     return values;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/22b9e756/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java 
b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
new file mode 100644
index 0000000..d1c1751
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import com.codahale.metrics.MetricRegistry;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.model.SharedGossipDataMessage;
+
+public class StandAloneNodeCrdtOrSet {
+  public static void main (String [] args) throws InterruptedException, 
IOException{
+    GossipSettings s = new GossipSettings();
+    s.setWindowSize(10);
+    s.setConvictThreshold(1.0);
+    s.setGossipInterval(10);
+    GossipService gossipService = new GossipService("mycluster",  
URI.create(args[0]), args[1], new HashMap<String, String>(),
+            Arrays.asList( new RemoteGossipMember("mycluster", 
URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
+    gossipService.start();
+    
+    new Thread(() -> {
+      while (true){
+      System.out.println("Live: " + 
gossipService.getGossipManager().getLiveMembers());
+      System.out.println("Dead: " + 
gossipService.getGossipManager().getDeadMembers());
+      System.out.println("---------- " + 
(gossipService.getGossipManager().findCrdt("abc") == null ? "": 
+          gossipService.getGossipManager().findCrdt("abc").value()));
+      System.out.println("********** " + 
gossipService.getGossipManager().findCrdt("abc"));
+      try {
+        Thread.sleep(2000);
+      } catch (Exception e) {}
+      }
+    }).start();
+    
+    String line = null;
+    try (BufferedReader br = new BufferedReader(new 
InputStreamReader(System.in))){
+      while ( (line = br.readLine()) != null){
+        System.out.println(line);
+        char op = line.charAt(0);
+        String val = line.substring(2);
+        if (op == 'a'){
+          addData(val, gossipService);
+        } else {
+          removeData(val, gossipService);
+        }
+      }
+    }
+  }
+  
+  private static void removeData(String val, GossipService gossipService){
+    OrSet<String> s = (OrSet<String>) 
gossipService.getGossipManager().findCrdt("abc");
+    SharedGossipDataMessage m = new SharedGossipDataMessage();
+    m.setExpireAt(Long.MAX_VALUE);
+    m.setKey("abc");
+    m.setPayload(new OrSet<String>(s , new 
OrSet.Builder<String>().remove(val)));
+    m.setTimestamp(System.currentTimeMillis());
+    gossipService.getGossipManager().merge(m);
+  }
+  
+  private static void addData(String val, GossipService gossipService){
+    SharedGossipDataMessage m = new SharedGossipDataMessage();
+    m.setExpireAt(Long.MAX_VALUE);
+    m.setKey("abc");
+    m.setPayload(new OrSet<String>(val));
+    m.setTimestamp(System.currentTimeMillis());
+    gossipService.getGossipManager().merge(m);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/22b9e756/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java 
b/src/main/java/org/apache/gossip/manager/GossipCore.java
index dff6413..a24b125 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -114,28 +114,36 @@ public class GossipCore implements GossipCoreConstants {
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public void addSharedData(SharedGossipDataMessage message) {
-    SharedGossipDataMessage previous = sharedData.get(message.getKey());
-    if (previous == null) {
-      sharedData.putIfAbsent(message.getKey(), message);
-    } else {
+    while (true){
+      SharedGossipDataMessage previous = 
sharedData.putIfAbsent(message.getKey(), message);
+      if (previous == null){
+        return;
+      }
       if (message.getPayload() instanceof Crdt){
-        SharedGossipDataMessage curretnt = sharedData.get(message.getKey());
         SharedGossipDataMessage merged = new SharedGossipDataMessage();
         merged.setExpireAt(message.getExpireAt());
-        merged.setKey(curretnt.getKey());
+        merged.setKey(message.getKey());
         merged.setNodeId(message.getNodeId());
         merged.setTimestamp(message.getTimestamp());
-        Crdt mergedCrdt = ((Crdt) 
message.getPayload()).merge((Crdt)curretnt.getPayload());
-        merged.setPayload( mergedCrdt );
-        sharedData.put(curretnt.getKey(), merged);
+        Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) 
message.getPayload());
+        merged.setPayload(mergedCrdt);
+        boolean replaced = sharedData.replace(message.getKey(), previous, 
merged);
+        if (replaced){
+          return;
+        }
       } else {
-        if (previous.getTimestamp() < message.getTimestamp()) {
-          sharedData.replace(message.getKey(), previous, message);
+        if (previous.getTimestamp() < message.getTimestamp()){
+          boolean result = sharedData.replace(message.getKey(), previous, 
message);
+          if (result){
+            return;
+          }
+        } else {
+          return;
         }
       }
     }
   }
-
+  
   public void addPerNodeData(GossipDataMessage message){
     ConcurrentHashMap<String,GossipDataMessage> nodeMap = new 
ConcurrentHashMap<>();
     nodeMap.put(message.getKey(), message);
@@ -363,8 +371,8 @@ public class GossipCore implements GossipCoreConstants {
   @SuppressWarnings("rawtypes")
   public Crdt merge(SharedGossipDataMessage message) {
     for (;;){
-      SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), 
message);
-      if (ret == null){
+      SharedGossipDataMessage previous = 
sharedData.putIfAbsent(message.getKey(), message);
+      if (previous == null){
         return (Crdt) message.getPayload();
       }
       SharedGossipDataMessage copy = new SharedGossipDataMessage();
@@ -373,9 +381,9 @@ public class GossipCore implements GossipCoreConstants {
       copy.setNodeId(message.getNodeId());
       copy.setTimestamp(message.getTimestamp());
       @SuppressWarnings("unchecked")
-      Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) 
message.getPayload());
+      Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) 
message.getPayload());
       copy.setPayload(merged);
-      boolean replaced = sharedData.replace(message.getKey(), ret, copy);
+      boolean replaced = sharedData.replace(message.getKey(), previous, copy);
       if (replaced){
         return merged;
       }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/22b9e756/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java 
b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 0140f00..4b28f2f 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -317,6 +317,7 @@ public abstract class GossipManager {
     }
     return gossipCore.merge(message);
   }
+  
   public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
     ConcurrentHashMap<String, GossipDataMessage> j = 
gossipCore.getPerNodeData().get(nodeId);
     if (j == null){

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/22b9e756/src/test/java/org/apache/gossip/crdt/OrSetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java 
b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index 8b8766a..e576764 100644
--- a/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -102,4 +102,15 @@ public class OrSetTest {
     Assert.assertEquals(back, i);
   }
   
-}
+  @Test
+  public void mergeTestSame() {
+    OrSet<Integer> i = new OrSet<>(19);
+    OrSet<Integer> j = new OrSet<>(19);
+    OrSet<Integer> k = i.merge(j);
+    Assert.assertEquals(2, k.getElements().get(19).size());
+    OrSet<Integer> y = new OrSet<>(k, new OrSet.Builder<Integer>().remove(19));
+    Assert.assertEquals(2, y.getTombstones().get(19).size());
+    Assert.assertEquals(2, y.getElements().get(19).size());
+    Assert.assertEquals(new OrSet<Integer>().value(), y.value());
+  }
+}
\ No newline at end of file

Reply via email to