Repository: incubator-gossip
Updated Branches:
  refs/heads/master 49d91084e -> daea6edb1


GOSSIP-21 WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/f7ee815d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/f7ee815d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/f7ee815d

Branch: refs/heads/master
Commit: f7ee815d919385a1dbcbaf3c1fc1cd530bf7c7d3
Parents: 4e13c89
Author: Edward Capriolo <[email protected]>
Authored: Fri Aug 12 20:07:48 2016 -0400
Committer: Edward Capriolo <[email protected]>
Committed: Fri Aug 12 20:07:48 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipService.java   |  14 ++
 .../gossip/manager/ActiveGossipThread.java      | 193 ++++++++++++++++---
 .../org/apache/gossip/manager/GossipCore.java   |  29 ++-
 .../apache/gossip/manager/GossipManager.java    |  23 ++-
 .../random/RandomActiveGossipThread.java        | 120 ------------
 src/main/java/org/apache/gossip/model/Base.java |   5 +-
 .../apache/gossip/model/GossipDataMessage.java  |  50 +++++
 .../apache/gossip/udp/UdpGossipDataMessage.java |  31 +++
 src/test/java/org/apache/gossip/DataTest.java   |  81 ++++++++
 9 files changed, 392 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java 
b/src/main/java/org/apache/gossip/GossipService.java
index ce15992..6f9b5be 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.random.RandomGossipManager;
+import org.apache.gossip.model.GossipDataMessage;
 import org.apache.log4j.Logger;
 
 /**
@@ -81,6 +82,19 @@ public class GossipService {
   public GossipManager get_gossipManager() {
     return gossipManager;
   }
+  
+  /**
+   * Gossip data to the entire cluster
+   * @param message
+   */
+  public void gossipData(GossipDataMessage message){
+    gossipManager.gossipData(message);
+  }
+  
+  
+  public GossipDataMessage findGossipData(String nodeId, String key){
+    return this.get_gossipManager().findGossipData(nodeId, key);
+  }
 
   public void set_gossipManager(GossipManager _gossipManager) {
     this.gossipManager = _gossipManager;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java 
b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 181d9ae..4790c09 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -17,58 +17,176 @@
  */
 package org.apache.gossip.manager;
 
+import java.io.IOException;
+import java.net.DatagramSocket;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gossip.GossipService;
 import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpGossipDataMessage;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * [The active thread: periodically send gossip request.] The class handles 
gossiping the membership
  * list. This information is important to maintaining a common state among all 
the nodes, and is
  * important for detecting failures.
  */
-abstract public class ActiveGossipThread implements Runnable {
+public class ActiveGossipThread {
 
-  protected final GossipManager gossipManager;
+  private static final Logger LOGGER = 
Logger.getLogger(ActiveGossipThread.class);
+  
+  private final GossipManager gossipManager;
+  private final Random random;
+  private final GossipCore gossipCore;
+  private ScheduledExecutorService scheduledExecutorService;
+  private ObjectMapper MAPPER = new ObjectMapper();
 
-  private final AtomicBoolean keepRunning;
-
-  public ActiveGossipThread(GossipManager gossipManager) {
+  public ActiveGossipThread(GossipManager gossipManager, GossipCore 
gossipCore) {
     this.gossipManager = gossipManager;
-    this.keepRunning = new AtomicBoolean(true);
+    random = new Random();
+    this.gossipCore = gossipCore;
+    this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
   }
 
-  @Override
-  public void run() {
-    while (keepRunning.get()) {
-      try {
-        
TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
-        
-        // contact a live member.
-        sendMembershipList(gossipManager.getMyself(), 
gossipManager.getLiveMembers());
-        
-        // contact a dead member.
-        sendMembershipList(gossipManager.getMyself(), 
gossipManager.getDeadMembers());
-        
-      } catch (InterruptedException e) {
-        GossipService.LOGGER.error(e);
-        keepRunning.set(false);
+  public void init(){
+    Runnable liveGossip = new Runnable(){
+      @Override
+      public void run() {
+        try {
+          sendMembershipList(gossipManager.getMyself(), 
gossipManager.getLiveMembers());
+        } catch (RuntimeException ex){
+          LOGGER.warn(ex);
+        }
       }
-    }
-    shutdown();
+    };
+    scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0, 
+            gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+    Runnable deadGossip = new Runnable(){
+      @Override
+      public void run() {
+        try {
+          sendMembershipList(gossipManager.getMyself(), 
gossipManager.getDeadMembers());
+        } catch (RuntimeException ex){
+          LOGGER.warn(ex);
+        }
+      }
+    };
+    scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0, 
+            gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+    Runnable dataGossip = new Runnable(){
+      @Override
+      public void run() {
+        try {
+          sendData(gossipManager.getMyself(), gossipManager.getLiveMembers());
+        } catch (RuntimeException ex){
+          LOGGER.warn(ex);
+        }
+      }
+    };
+    scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0, 
+            gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
+    
   }
 
   public void shutdown() {
-    keepRunning.set(false);
+    scheduledExecutorService.shutdown();
+    try {
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn(e);
+    }
   }
 
+  public void sendData(LocalGossipMember me, List<LocalGossipMember> 
memberList){
+    LocalGossipMember member = selectPartner(memberList);
+    if (member == null) {
+      GossipService.LOGGER.debug("Send sendMembershipList() is called without 
action");
+      return;
+    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+      for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : 
gossipCore.getPerNodeData().entrySet()){
+        for (Entry<String, GossipDataMessage> innerEntry : 
entry.getValue().entrySet()){
+          UdpGossipDataMessage message = new UdpGossipDataMessage();
+          System.out.println("sending message " + message);
+          message.setUuid(UUID.randomUUID().toString());
+          message.setUriFrom(me.getId());
+          message.setExpireAt(innerEntry.getValue().getExpireAt());
+          message.setKey(innerEntry.getValue().getKey());
+          message.setNodeId(innerEntry.getValue().getNodeId());
+          message.setTimestamp(innerEntry.getValue().getTimestamp());
+          message.setPayload(innerEntry.getValue().getPayload());
+          message.setTimestamp(innerEntry.getValue().getTimestamp());
+          byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+          int packet_length = json_bytes.length;
+          if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+            //Response r = gossipCore.send(message, member.getUri());
+            gossipCore.sendOneWay(message, member.getUri());
+            //TODO: ack this message
+          } else {
+            GossipService.LOGGER.error("The length of the to be send message 
is too large ("
+                    + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + 
").");
+          }
+        }
+      }
+    } catch (IOException e1) {
+      GossipService.LOGGER.warn(e1);
+    }
+  }
+  
   /**
    * Performs the sending of the membership list, after we have incremented 
our own heartbeat.
    */
-  abstract protected void sendMembershipList(LocalGossipMember me,
-          List<LocalGossipMember> memberList);
+  protected void sendMembershipList(LocalGossipMember me, 
List<LocalGossipMember> memberList) {
+    
+    me.setHeartbeat(System.currentTimeMillis());
+    LocalGossipMember member = selectPartner(memberList);
+    if (member == null) {
+      GossipService.LOGGER.debug("Send sendMembershipList() is called without 
action");
+      return;
+    } else {
+      GossipService.LOGGER.debug("Send sendMembershipList() is called to " + 
member.toString());
+    }
+    
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+      UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+      message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+      message.setUuid(UUID.randomUUID().toString());
+      message.getMembers().add(convert(me));
+      for (LocalGossipMember other : memberList) {
+        message.getMembers().add(convert(other));
+      }
+      byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+      int packet_length = json_bytes.length;
+      if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+        Response r = gossipCore.send(message, member.getUri());
+        if (r instanceof ActiveGossipOk){
+          //maybe count metrics here
+        } else {
+          LOGGER.warn("Message "+ message + " generated response "+ r);
+        }
+      } else {
+        GossipService.LOGGER.error("The length of the to be send message is 
too large ("
+                + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + 
").");
+      }
+    } catch (IOException e1) {
+      GossipService.LOGGER.warn(e1);
+    }
+  }
 
   /**
    * Abstract method which should be implemented by a subclass. This method 
should return a member
@@ -78,5 +196,24 @@ abstract public class ActiveGossipThread implements 
Runnable {
    *          The list of members which are stored in the local list of 
members.
    * @return The chosen LocalGossipMember to gossip with.
    */
-  abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> 
memberList);
+  protected LocalGossipMember selectPartner(List<LocalGossipMember> 
memberList) {
+    LocalGossipMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    } else {
+      GossipService.LOGGER.debug("I am alone in this world.");
+      
+    }
+    return member;
+  }
+  
+  private GossipMember convert(LocalGossipMember member){
+    GossipMember gm = new GossipMember();
+    gm.setCluster(member.getClusterName());
+    gm.setHeartbeat(member.getHeartbeat());
+    gm.setUri(member.getUri().toASCIIString());
+    gm.setId(member.getId());
+    return gm;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java 
b/src/main/java/org/apache/gossip/manager/GossipCore.java
index ab24621..8bcba46 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -21,10 +21,12 @@ import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.RemoteGossipMember;
 import org.apache.gossip.model.ActiveGossipMessage;
 import org.apache.gossip.model.Base;
+import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.Response;
 import org.apache.gossip.udp.Trackable;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -34,15 +36,28 @@ public class GossipCore {
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private final GossipManager gossipManager;
-
   private ConcurrentHashMap<String, Base> requests;
-  
   private ExecutorService service;
+  private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
GossipDataMessage>> perNodeData;
   
   public GossipCore(GossipManager manager){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
     service = Executors.newFixedThreadPool(500);
+    perNodeData = new ConcurrentHashMap<>();
+  }
+  
+  public void addPerNodeData(GossipDataMessage message){
+    ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
+    m.put(message.getKey(), message);
+    m = perNodeData.putIfAbsent(message.getNodeId(), m);
+    if (m != null){
+      m.put(message.getKey(), message);    //TODO only put if > ts
+    }
+  }
+  
+  public ConcurrentHashMap<String, ConcurrentHashMap<String, 
GossipDataMessage>> getPerNodeData(){
+    return perNodeData;
   }
   
   public void shutdown(){
@@ -55,12 +70,22 @@ public class GossipCore {
   }
   
   public void recieve(Base base){
+    System.out.println(base);
     if (base instanceof Response){
       if (base instanceof Trackable){
         Trackable t = (Trackable) base;
         requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
       }
     }
+    if (base instanceof GossipDataMessage) {
+      UdpGossipDataMessage message = (UdpGossipDataMessage) base;
+      addPerNodeData(message);
+      /*
+      UdpActiveGossipOk o = new UdpActiveGossipOk();
+      o.setUriFrom(message.getUriFrom());
+      o.setUuid(message.getUuid());
+      sendOneWay(o, senderMember.getUri());*/
+    }
     if (base instanceof ActiveGossipMessage){
       List<GossipMember> remoteGossipMembers = new ArrayList<>();
       RemoteGossipMember senderMember = null;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java 
b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 79be431..36bc10a 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,7 +41,7 @@ import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-import org.apache.gossip.manager.random.RandomActiveGossipThread;
+import org.apache.gossip.model.GossipDataMessage;
 
 public abstract class GossipManager extends Thread implements 
NotificationListener {
 
@@ -187,8 +188,8 @@ public abstract class GossipManager extends Thread 
implements NotificationListen
     }
     passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, 
gossipCore);
     gossipThreadExecutor.execute(passiveGossipThread);
-    activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore);
-    gossipThreadExecutor.execute(activeGossipThread);
+    activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
+    activeGossipThread.init();
     GossipService.LOGGER.debug("The GossipService is started.");
     while (gossipServiceRunning.get()) {
       try {
@@ -222,4 +223,20 @@ public abstract class GossipManager extends Thread 
implements NotificationListen
       LOGGER.error(e);
     }
   }
+  
+  public void gossipData(GossipDataMessage message){
+    message.setNodeId(me.getId());
+    gossipCore.addPerNodeData(message);
+    System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData());
+  }
+  
+  public GossipDataMessage findGossipData(String nodeId, String key){
+    ConcurrentHashMap<String, GossipDataMessage> j = 
gossipCore.getPerNodeData().get(nodeId);
+    if (j == null){
+      return null;
+    } else {
+      return j.get(key);
+    }
+  }
+            
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java 
b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
deleted file mode 100644
index 03d550c..0000000
--- 
a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.random;
-
-import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.gossip.GossipService;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.manager.ActiveGossipThread;
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.GossipMember;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-
-public class RandomActiveGossipThread extends ActiveGossipThread {
-
-  public static final Logger LOGGER = 
Logger.getLogger(RandomActiveGossipThread.class);
-  protected ObjectMapper MAPPER = new ObjectMapper();
-  
-  /** The Random used for choosing a member to gossip with. */
-  private final Random random;
-  private final GossipCore gossipCore;
-
-  public RandomActiveGossipThread(GossipManager gossipManager, GossipCore 
gossipCore) {
-    super(gossipManager);
-    random = new Random();
-    this.gossipCore = gossipCore;
-  }
-
-  /**
-   * [The selectToSend() function.] Find a random peer from the local 
membership list. In the case
-   * where this client is the only member in the list, this method will return 
null.
-   * 
-   * @return Member random member if list is greater than 1, null otherwise
-   */
-  protected LocalGossipMember selectPartner(List<LocalGossipMember> 
memberList) {
-    LocalGossipMember member = null;
-    if (memberList.size() > 0) {
-      int randomNeighborIndex = random.nextInt(memberList.size());
-      member = memberList.get(randomNeighborIndex);
-    } else {
-      GossipService.LOGGER.debug("I am alone in this world.");
-      
-    }
-    return member;
-  }
-
-  protected void sendMembershipList(LocalGossipMember me, 
List<LocalGossipMember> memberList) {
-    
-    me.setHeartbeat(System.currentTimeMillis());
-    LocalGossipMember member = selectPartner(memberList);
-    if (member == null) {
-      GossipService.LOGGER.debug("Send sendMembershipList() is called without 
action");
-      return;
-    } else {
-      GossipService.LOGGER.debug("Send sendMembershipList() is called to " + 
member.toString());
-    }
-    
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
-      UdpActiveGossipMessage message = new UdpActiveGossipMessage();
-      message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
-      message.setUuid(UUID.randomUUID().toString());
-      message.getMembers().add(convert(me));
-      for (LocalGossipMember other : memberList) {
-        message.getMembers().add(convert(other));
-      }
-      byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
-      int packet_length = json_bytes.length;
-      if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-        Response r = gossipCore.send(message, member.getUri());
-        if (r instanceof ActiveGossipOk){
-          //maybe count metrics here
-        } else {
-          LOGGER.warn("Message "+ message + " generated response "+ r);
-        }
-      } else {
-        GossipService.LOGGER.error("The length of the to be send message is 
too large ("
-                + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + 
").");
-      }
-    } catch (IOException e1) {
-      GossipService.LOGGER.warn(e1);
-    }
-  }
-  
-  private GossipMember convert(LocalGossipMember member){
-    GossipMember gm = new GossipMember();
-    gm.setCluster(member.getClusterName());
-    gm.setHeartbeat(member.getHeartbeat());
-    gm.setUri(member.getUri().toASCIIString());
-    gm.setId(member.getId());
-    return gm;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/model/Base.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/Base.java 
b/src/main/java/org/apache/gossip/model/Base.java
index ebb3215..66c2be6 100644
--- a/src/main/java/org/apache/gossip/model/Base.java
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -2,6 +2,7 @@ package org.apache.gossip.model;
 
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
 import org.codehaus.jackson.annotate.JsonSubTypes;
 import org.codehaus.jackson.annotate.JsonSubTypes.Type;
@@ -17,7 +18,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
         @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"),
         @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
         @Type(value = UdpActiveGossipMessage.class, name = 
"UdpActiveGossipMessage"),
-        @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault")
+        @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
+        @Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
+        @Type(value = UdpGossipDataMessage.class, name = 
"UdpGossipDataMessage")
         })
 public class Base {
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/model/GossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java 
b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
new file mode 100644
index 0000000..2128dfe
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
@@ -0,0 +1,50 @@
+package org.apache.gossip.model;
+
+public class GossipDataMessage extends Base {
+
+  
+  private String nodeId;
+  private String key;
+  private Object payload;
+  private Long timestamp;
+  private Long expireAt;
+  
+  public String getNodeId() {
+    return nodeId;
+  }
+  public void setNodeId(String nodeId) {
+    this.nodeId = nodeId;
+  }
+  public String getKey() {
+    return key;
+  }
+  public void setKey(String key) {
+    this.key = key;
+  }
+  public Object getPayload() {
+    return payload;
+  }
+  public void setPayload(Object payload) {
+    this.payload = payload;
+  }
+  public Long getTimestamp() {
+    return timestamp;
+  }
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+  public Long getExpireAt() {
+    return expireAt;
+  }
+  public void setExpireAt(Long expireAt) {
+    this.expireAt = expireAt;
+  }
+  @Override
+  public String toString() {
+    return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", 
payload=" + payload
+            + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+  }
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java 
b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
new file mode 100644
index 0000000..2ee4bbf
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.GossipDataMessage;
+
+public class UdpGossipDataMessage extends GossipDataMessage implements 
Trackable {
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String toString() {
+    return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f7ee815d/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java 
b/src/test/java/org/apache/gossip/DataTest.java
new file mode 100644
index 0000000..3f42eeb
--- /dev/null
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -0,0 +1,81 @@
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.GossipDataMessage;
+import org.junit.Test;
+
+import io.teknek.tunit.TUnit;
+
+public class DataTest {
+
+  @Test
+  public void abc() throws InterruptedException, UnknownHostException, 
URISyntaxException{
+    GossipSettings settings = new GossipSettings();
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<GossipMember> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes+1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+    }
+    final List<GossipService> clients = new ArrayList<>();
+    final int clusterMembers = 2;
+    for (int i = 1; i < clusterMembers+1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipService gossipService = new GossipService(cluster, uri, i + "",
+              startupMembers, settings,
+              new GossipListener(){
+        public void gossipEvent(GossipMember member, GossipState state) {
+          System.out.println(member + " " + state);
+        }
+      });
+      clients.add(gossipService);
+      gossipService.start();
+    }
+    TUnit.assertThat(new Callable<Integer> (){
+      public Integer call() throws Exception {
+        int total = 0;
+        for (int i = 0; i < clusterMembers; ++i) {
+          total += clients.get(i).get_gossipManager().getLiveMembers().size();
+        }
+        return total;
+      }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+    clients.get(0).gossipData(msg());
+    Thread.sleep(10000);
+    TUnit.assertThat(
+            
+            new Callable<Object> (){
+              public Object call() throws Exception {
+                GossipDataMessage x = clients.get(1).findGossipData(1+"" , 
"a");
+                if (x == null) return "";
+                else return x.getPayload();
+              }})
+            
+            
+            //() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
+    .afterWaitingAtMost(20, TimeUnit.SECONDS)
+    .isEqualTo("b");
+    for (int i = 0; i < clusterMembers; ++i) {
+      clients.get(i).shutdown();
+    }
+  }
+  
+  private GossipDataMessage msg(){
+    GossipDataMessage g = new GossipDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey("a");
+    g.setPayload("b");
+    g.setTimestamp(System.currentTimeMillis());
+    return g;
+  }
+}

Reply via email to