GOSSIP-21 Gossip user defined data

Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/3d554f61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/3d554f61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/3d554f61

Branch: refs/heads/master
Commit: 3d554f610e1236c98613bf5c34bffcb5a800faea
Parents: 5c660b2 5590758
Author: Edward Capriolo <edlinuxg...@gmail.com>
Authored: Sun Oct 2 15:02:45 2016 -0400
Committer: Edward Capriolo <edlinuxg...@gmail.com>
Committed: Sun Oct 2 15:02:45 2016 -0400

----------------------------------------------------------------------
 pom.xml                                         |  80 +++++----
 .../java/org/apache/gossip/GossipService.java   |   4 +-
 .../gossip/manager/ActiveGossipThread.java      |  78 +++-----
 .../org/apache/gossip/manager/GossipCore.java   |  14 +-
 .../apache/gossip/manager/GossipManager.java    |  15 +-
 .../gossip/manager/PassiveGossipThread.java     |  13 --
 .../OnlyProcessReceivedPassiveGossipThread.java |  80 ---------
 .../apache/gossip/model/GossipDataMessage.java  |   1 -
 src/test/java/org/apache/gossip/DataTest.java   |   4 +-
 .../org/apache/gossip/ShutdownDeadtimeTest.java | 178 ++++++++++---------
 .../org/apache/gossip/StartupSettingsTest.java  |   6 +
 .../org/apache/gossip/TenNodeThreeSeedTest.java |   8 +-
 .../manager/RandomGossipManagerBuilderTest.java |   8 +-
 13 files changed, 185 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 4790c09,b57c25a..19caffe
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@@ -20,22 -20,17 +20,23 @@@ package org.apache.gossip.manager
  import java.io.IOException;
  import java.net.DatagramSocket;
  import java.util.List;
++
 +import java.util.Map.Entry;
  import java.util.Random;
  import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.TimeUnit;
 -import org.apache.gossip.GossipService;
 +
- import org.apache.gossip.GossipService;
  import org.apache.gossip.LocalGossipMember;
  import org.apache.gossip.model.ActiveGossipOk;
 +import org.apache.gossip.model.GossipDataMessage;
  import org.apache.gossip.model.GossipMember;
  import org.apache.gossip.model.Response;
  import org.apache.gossip.udp.UdpActiveGossipMessage;
 +import org.apache.gossip.udp.UdpGossipDataMessage;
++
  import org.apache.log4j.Logger;
  import org.codehaus.jackson.map.ObjectMapper;
  
@@@ -56,109 -51,41 +57,77 @@@ public class ActiveGossipThread 
  
    public ActiveGossipThread(GossipManager gossipManager, GossipCore 
gossipCore) {
      this.gossipManager = gossipManager;
 -    this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
 +    random = new Random();
      this.gossipCore = gossipCore;
 -    this.random = new Random();
 +    this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
    }
--
-   public void init(){
-     Runnable liveGossip = new Runnable(){
-       @Override
-       public void run() {
-         try {
-           sendMembershipList(gossipManager.getMyself(), 
gossipManager.getLiveMembers());
-         } catch (RuntimeException ex){
-           LOGGER.warn(ex);
-         }
-       }
-     };
-     scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0, 
++ 
+   public void init() {
+     scheduledExecutorService.scheduleAtFixedRate(
+             () -> sendMembershipList(gossipManager.getMyself(), 
gossipManager.getLiveMembers()), 0,
              gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
-     Runnable deadGossip = new Runnable(){
-       @Override
-       public void run() {
-         try {
-           sendMembershipList(gossipManager.getMyself(), 
gossipManager.getDeadMembers());
-         } catch (RuntimeException ex){
-           LOGGER.warn(ex);
-         }
-       }
-     };
-     scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0, 
+     scheduledExecutorService.scheduleAtFixedRate(
+             () -> sendMembershipList(gossipManager.getMyself(), 
gossipManager.getDeadMembers()), 0,
              gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
-     Runnable dataGossip = new Runnable(){
-       @Override
-       public void run() {
-         try {
-           sendData(gossipManager.getMyself(), gossipManager.getLiveMembers());
-         } catch (RuntimeException ex){
-           LOGGER.warn(ex);
-         }
-       }
-     };
-     scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0, 
++    scheduledExecutorService.scheduleAtFixedRate(
++            () -> sendData(gossipManager.getMyself(), 
gossipManager.getLiveMembers()), 0,
 +            gossipManager.getSettings().getGossipInterval(), 
TimeUnit.MILLISECONDS);
-     
    }
- 
+   
    public void shutdown() {
 -    this.scheduledExecutorService.shutdown();
 +    scheduledExecutorService.shutdown();
      try {
 -      this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
 +      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
-       LOGGER.warn(e);
 -      LOGGER.warn("Did not complete shutdown", e);
++      LOGGER.debug("Issue during shurdown" + e);
      }
    }
  
 +  public void sendData(LocalGossipMember me, List<LocalGossipMember> 
memberList){
 +    LocalGossipMember member = selectPartner(memberList);
 +    if (member == null) {
-       GossipService.LOGGER.debug("Send sendMembershipList() is called without 
action");
++      LOGGER.debug("Send sendMembershipList() is called without action");
 +      return;
 +    }
 +    try (DatagramSocket socket = new DatagramSocket()) {
 +      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
 +      for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry 
: gossipCore.getPerNodeData().entrySet()){
 +        for (Entry<String, GossipDataMessage> innerEntry : 
entry.getValue().entrySet()){
 +          UdpGossipDataMessage message = new UdpGossipDataMessage();
-           System.out.println("sending message " + message);
 +          message.setUuid(UUID.randomUUID().toString());
 +          message.setUriFrom(me.getId());
 +          message.setExpireAt(innerEntry.getValue().getExpireAt());
 +          message.setKey(innerEntry.getValue().getKey());
 +          message.setNodeId(innerEntry.getValue().getNodeId());
 +          message.setTimestamp(innerEntry.getValue().getTimestamp());
 +          message.setPayload(innerEntry.getValue().getPayload());
 +          message.setTimestamp(innerEntry.getValue().getTimestamp());
 +          byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
 +          int packet_length = json_bytes.length;
 +          if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-             //Response r = gossipCore.send(message, member.getUri());
 +            gossipCore.sendOneWay(message, member.getUri());
-             //TODO: ack this message
 +          } else {
-             GossipService.LOGGER.error("The length of the to be send message 
is too large ("
++            LOGGER.error("The length of the to be send message is too large ("
 +                    + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + 
").");
 +          }
 +        }
 +      }
 +    } catch (IOException e1) {
-       GossipService.LOGGER.warn(e1);
++      LOGGER.warn(e1);
 +    }
 +  }
 +  
    /**
     * Performs the sending of the membership list, after we have incremented 
our own heartbeat.
     */
-   protected void sendMembershipList(LocalGossipMember me, 
List<LocalGossipMember> memberList) {
 - protected void sendMembershipList(LocalGossipMember me, 
List<LocalGossipMember> memberList) {
--    
++  protected void sendMembershipList(LocalGossipMember me, 
List<LocalGossipMember> memberList) {  
      me.setHeartbeat(System.currentTimeMillis());
      LocalGossipMember member = selectPartner(memberList);
      if (member == null) {
--      GossipService.LOGGER.debug("Send sendMembershipList() is called without 
action");
++      LOGGER.debug("Send sendMembershipList() is called without action");
        return;
      } else {
--      GossipService.LOGGER.debug("Send sendMembershipList() is called to " + 
member.toString());
++      LOGGER.debug("Send sendMembershipList() is called to " + 
member.toString());
      }
      
      try (DatagramSocket socket = new DatagramSocket()) {
@@@ -180,33 -107,31 +149,30 @@@
            LOGGER.warn("Message "+ message + " generated response "+ r);
          }
        } else {
--        GossipService.LOGGER.error("The length of the to be send message is 
too large ("
++        LOGGER.error("The length of the to be send message is too large ("
                  + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + 
").");
        }
      } catch (IOException e1) {
--      GossipService.LOGGER.warn(e1);
++      LOGGER.warn(e1);
      }
    }
- 
++  
    /**
--   * Abstract method which should be implemented by a subclass. This method 
should return a member
--   * of the list to gossip with.
     * 
     * @param memberList
     *          The list of members which are stored in the local list of 
members.
     * @return The chosen LocalGossipMember to gossip with.
     */
 - protected LocalGossipMember selectPartner(List<LocalGossipMember> 
memberList) {
 -   LocalGossipMember member = null;
 -   if (memberList.size() > 0) {
 -     int randomNeighborIndex = random.nextInt(memberList.size());
 -     member = memberList.get(randomNeighborIndex);
 -   } else {
 -     LOGGER.debug("I am alone in this world.");
 -   }
 -   return member;
 - }
 +  protected LocalGossipMember selectPartner(List<LocalGossipMember> 
memberList) {
 +    LocalGossipMember member = null;
 +    if (memberList.size() > 0) {
 +      int randomNeighborIndex = random.nextInt(memberList.size());
 +      member = memberList.get(randomNeighborIndex);
 +    } else {
-       GossipService.LOGGER.debug("I am alone in this world.");
-       
++      LOGGER.debug("I am alone in this world.");
 +    }
 +    return member;
 +  }
    
    private GossipMember convert(LocalGossipMember member){
      GossipMember gm = new GossipMember();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/manager/GossipCore.java
index 8bcba46,ab24621..46d855a
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@@ -44,20 -43,6 +44,24 @@@ public class GossipCore 
      this.gossipManager = manager;
      requests = new ConcurrentHashMap<>();
      service = Executors.newFixedThreadPool(500);
 +    perNodeData = new ConcurrentHashMap<>();
 +  }
 +  
++  /**
++   *  
++   * @param message
++   */
 +  public void addPerNodeData(GossipDataMessage message){
 +    ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
 +    m.put(message.getKey(), message);
 +    m = perNodeData.putIfAbsent(message.getNodeId(), m);
 +    if (m != null){
 +      m.put(message.getKey(), message);    //TODO only put if > ts
 +    }
 +  }
 +  
 +  public ConcurrentHashMap<String, ConcurrentHashMap<String, 
GossipDataMessage>> getPerNodeData(){
 +    return perNodeData;
    }
    
    public void shutdown(){
@@@ -77,15 -61,6 +80,10 @@@
          requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
        }
      }
 +    if (base instanceof GossipDataMessage) {
 +      UdpGossipDataMessage message = (UdpGossipDataMessage) base;
 +      addPerNodeData(message);
-       /*
-       UdpActiveGossipOk o = new UdpActiveGossipOk();
-       o.setUriFrom(message.getUriFrom());
-       o.setUuid(message.getUuid());
-       sendOneWay(o, senderMember.getUri());*/
 +    }
      if (base instanceof ActiveGossipMessage){
        List<GossipMember> remoteGossipMembers = new ArrayList<>();
        RemoteGossipMember senderMember = null;
@@@ -178,11 -153,11 +176,11 @@@
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (ExecutionException e) {
--      LOGGER.error(e.getMessage(), e);
++      LOGGER.debug(e.getMessage(), e);
        return null;
      } catch (TimeoutException e) {
        boolean cancelled = response.cancel(true);
--      LOGGER.error(String.format("Threadpool timeout attempting to contact 
%s, cancelled ? %b", uri.toString(), cancelled));
++      LOGGER.debug(String.format("Threadpool timeout attempting to contact 
%s, cancelled ? %b", uri.toString(), cancelled));
        return null; 
      } finally {
        if (t != null){

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/manager/GossipManager.java
index 36bc10a,0b2cfd2..94b57d1
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@@ -41,9 -40,8 +41,11 @@@ import org.apache.gossip.LocalGossipMem
  import org.apache.gossip.event.GossipListener;
  import org.apache.gossip.event.GossipState;
  import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+ 
 +import org.apache.gossip.model.GossipDataMessage;
 +
- public abstract class GossipManager extends Thread implements 
NotificationListener {
++
+ public abstract class GossipManager implements NotificationListener {
  
    public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
  
@@@ -223,20 -213,4 +217,19 @@@
        LOGGER.error(e);
      }
    }
 +  
 +  public void gossipData(GossipDataMessage message){
 +    message.setNodeId(me.getId());
 +    gossipCore.addPerNodeData(message);
-     System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData());
 +  }
 +  
 +  public GossipDataMessage findGossipData(String nodeId, String key){
 +    ConcurrentHashMap<String, GossipDataMessage> j = 
gossipCore.getPerNodeData().get(nodeId);
 +    if (j == null){
 +      return null;
 +    } else {
 +      return j.get(key);
 +    }
 +  }
 +            
  }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 6d440de,6d440de..11c371e
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@@ -121,17 -121,17 +121,4 @@@ abstract public class PassiveGossipThre
      }
    }
  
--  /**
--   * Abstract method for merging the local and remote list.
--   * 
--   * @param gossipManager
--   *          The GossipManager for retrieving the local members and dead 
members list.
--   * @param senderMember
--   *          The member who is sending this list, this could be used to send 
a response if the
--   *          remote list contains out-dated information.
--   * @param remoteList
--   *          The list of members known at the remote side.
--   */
--  abstract protected void mergeLists(GossipManager gossipManager, 
RemoteGossipMember senderMember,
--          List<GossipMember> remoteList);
  }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --cc 
src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index 0b51573,0b51573..dff5056
--- 
a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ 
b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@@ -17,11 -17,11 +17,6 @@@
   */
  package org.apache.gossip.manager.impl;
  
--import java.util.List;
--
--import org.apache.gossip.GossipMember;
--import org.apache.gossip.LocalGossipMember;
--import org.apache.gossip.RemoteGossipMember;
  import org.apache.gossip.manager.GossipCore;
  import org.apache.gossip.manager.GossipManager;
  import org.apache.gossip.manager.PassiveGossipThread;
@@@ -35,79 -35,79 +30,4 @@@ public class OnlyProcessReceivedPassive
      super(gossipManager, gossipCore);
    }
  
--  /**
--   * Merge remote list (received from peer), and our local member list. 
Simply, we must update the
--   * heartbeats that the remote list has with our list. Also, some additional 
logic is needed to
--   * make sure we have not timed out a member and then immediately received a 
list with that member.
--   * 
--   * @param gossipManager
--   * @param senderMember
--   * @param remoteList
--   */
--  protected void mergeLists(GossipManager gossipManager, RemoteGossipMember 
senderMember,
--          List<GossipMember> remoteList) {
--
--    // if the person sending to us is in the dead list consider them up
--    for (LocalGossipMember i : gossipManager.getDeadList()) {
--      if (i.getId().equals(senderMember.getId())) {
--        LOGGER.info(gossipManager.getMyself() + " contacted by dead member " 
+ senderMember.getUri());
--        LocalGossipMember newLocalMember = new 
LocalGossipMember(senderMember.getClusterName(),
--                senderMember.getUri(), senderMember.getId(),
--                senderMember.getHeartbeat(), gossipManager, 
gossipManager.getSettings()
--                        .getCleanupInterval());
--        gossipManager.reviveMember(newLocalMember);
--        newLocalMember.startTimeoutTimer();
--      }
--    }
--    for (GossipMember remoteMember : remoteList) {
--      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
--        continue;
--      }
--      if (gossipManager.getLiveMembers().contains(remoteMember)) {
--        LocalGossipMember localMember = gossipManager.getLiveMembers().get(
--                gossipManager.getLiveMembers().indexOf(remoteMember));
--        if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
--          localMember.setHeartbeat(remoteMember.getHeartbeat());
--          localMember.resetTimeoutTimer();
--        }
--      } else if (!gossipManager.getLiveMembers().contains(remoteMember)
--              && !gossipManager.getDeadList().contains(remoteMember)) {
--        LocalGossipMember newLocalMember = new 
LocalGossipMember(remoteMember.getClusterName(),
--                remoteMember.getUri(), remoteMember.getId(),
--                remoteMember.getHeartbeat(), gossipManager, 
gossipManager.getSettings()
--                        .getCleanupInterval());
--        gossipManager.createOrReviveMember(newLocalMember);
--        newLocalMember.startTimeoutTimer();
--      } else {
--        if (gossipManager.getDeadList().contains(remoteMember)) {
--          LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
--                  gossipManager.getDeadList().indexOf(remoteMember));
--          if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
--            LocalGossipMember newLocalMember = new 
LocalGossipMember(remoteMember.getClusterName(),
--                    remoteMember.getUri(), remoteMember.getId(),
--                    remoteMember.getHeartbeat(), gossipManager, 
gossipManager.getSettings()
--                            .getCleanupInterval());
--            gossipManager.reviveMember(newLocalMember);
--            newLocalMember.startTimeoutTimer();
--            LOGGER.debug("Removed remote member " + remoteMember.getAddress()
--                    + " from dead list and added to local member list.");
--          } else {
--            LOGGER.debug("me " + gossipManager.getMyself());
--            LOGGER.debug("sender " + senderMember);
--            LOGGER.debug("remote " + remoteList);
--            LOGGER.debug("live " + gossipManager.getLiveMembers());
--            LOGGER.debug("dead " + gossipManager.getDeadList());
--          }
--        } else {
--          LOGGER.debug("me " + gossipManager.getMyself());
--          LOGGER.debug("sender " + senderMember);
--          LOGGER.debug("remote " + remoteList);
--          LOGGER.debug("live " + gossipManager.getLiveMembers());
--          LOGGER.debug("dead " + gossipManager.getDeadList());
--          // throw new IllegalArgumentException("wtf");
--        }
--      }
--    }
--  }
--
  }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/model/GossipDataMessage.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/model/GossipDataMessage.java
index 2128dfe,0000000..835c668
mode 100644,000000..100644
--- a/src/main/java/org/apache/gossip/model/GossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
@@@ -1,50 -1,0 +1,49 @@@
 +package org.apache.gossip.model;
 +
 +public class GossipDataMessage extends Base {
 +
-   
 +  private String nodeId;
 +  private String key;
 +  private Object payload;
 +  private Long timestamp;
 +  private Long expireAt;
 +  
 +  public String getNodeId() {
 +    return nodeId;
 +  }
 +  public void setNodeId(String nodeId) {
 +    this.nodeId = nodeId;
 +  }
 +  public String getKey() {
 +    return key;
 +  }
 +  public void setKey(String key) {
 +    this.key = key;
 +  }
 +  public Object getPayload() {
 +    return payload;
 +  }
 +  public void setPayload(Object payload) {
 +    this.payload = payload;
 +  }
 +  public Long getTimestamp() {
 +    return timestamp;
 +  }
 +  public void setTimestamp(Long timestamp) {
 +    this.timestamp = timestamp;
 +  }
 +  public Long getExpireAt() {
 +    return expireAt;
 +  }
 +  public void setExpireAt(Long expireAt) {
 +    this.expireAt = expireAt;
 +  }
 +  @Override
 +  public String toString() {
 +    return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", 
payload=" + payload
 +            + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
 +  }
 +
 +  
 +  
 +}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --cc src/test/java/org/apache/gossip/DataTest.java
index 3f42eeb,0000000..6260f9b
mode 100644,000000..100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@@ -1,81 -1,0 +1,81 @@@
 +package org.apache.gossip;
 +
 +import java.net.URI;
 +import java.net.URISyntaxException;
 +import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.UUID;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.gossip.event.GossipListener;
 +import org.apache.gossip.event.GossipState;
 +import org.apache.gossip.model.GossipDataMessage;
 +import org.junit.Test;
 +
 +import io.teknek.tunit.TUnit;
 +
 +public class DataTest {
- 
++  
 +  @Test
 +  public void abc() throws InterruptedException, UnknownHostException, 
URISyntaxException{
 +    GossipSettings settings = new GossipSettings();
 +    String cluster = UUID.randomUUID().toString();
 +    int seedNodes = 1;
 +    List<GossipMember> startupMembers = new ArrayList<>();
 +    for (int i = 1; i < seedNodes+1; ++i) {
 +      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
 +      startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
 +    }
 +    final List<GossipService> clients = new ArrayList<>();
 +    final int clusterMembers = 2;
 +    for (int i = 1; i < clusterMembers+1; ++i) {
 +      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
 +      GossipService gossipService = new GossipService(cluster, uri, i + "",
 +              startupMembers, settings,
 +              new GossipListener(){
 +        public void gossipEvent(GossipMember member, GossipState state) {
-           System.out.println(member + " " + state);
++          
 +        }
 +      });
 +      clients.add(gossipService);
 +      gossipService.start();
 +    }
 +    TUnit.assertThat(new Callable<Integer> (){
 +      public Integer call() throws Exception {
 +        int total = 0;
 +        for (int i = 0; i < clusterMembers; ++i) {
 +          total += clients.get(i).get_gossipManager().getLiveMembers().size();
 +        }
 +        return total;
 +      }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
 +    clients.get(0).gossipData(msg());
 +    Thread.sleep(10000);
 +    TUnit.assertThat(
 +            
 +            new Callable<Object> (){
 +              public Object call() throws Exception {
 +                GossipDataMessage x = clients.get(1).findGossipData(1+"" , 
"a");
 +                if (x == null) return "";
 +                else return x.getPayload();
 +              }})
 +            
 +            
 +            //() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
 +    .afterWaitingAtMost(20, TimeUnit.SECONDS)
 +    .isEqualTo("b");
 +    for (int i = 0; i < clusterMembers; ++i) {
 +      clients.get(i).shutdown();
 +    }
 +  }
 +  
 +  private GossipDataMessage msg(){
 +    GossipDataMessage g = new GossipDataMessage();
 +    g.setExpireAt(Long.MAX_VALUE);
 +    g.setKey("a");
 +    g.setPayload("b");
 +    g.setTimestamp(System.currentTimeMillis());
 +    return g;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --cc src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 251550b,f0d7f10..82cb625
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@@ -29,107 -29,111 +29,115 @@@ import java.util.UUID
  import java.util.concurrent.Callable;
  import java.util.concurrent.TimeUnit;
  
--
  import org.apache.log4j.Logger;
  
  import org.apache.gossip.event.GossipListener;
  import org.apache.gossip.event.GossipState;
+ import org.junit.platform.runner.JUnitPlatform;
  import org.junit.jupiter.api.Test;
  
+ import org.junit.runner.RunWith;
+ 
+ @RunWith(JUnitPlatform.class)
  public class ShutdownDeadtimeTest {
  
-   private static final Logger log = 
Logger.getLogger(ShutdownDeadtimeTest.class );
+   private static final Logger log = 
Logger.getLogger(ShutdownDeadtimeTest.class);
 -
++  
    @Test
-   //@Ignore
--  public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, 
UnknownHostException, URISyntaxException {
--      GossipSettings settings = new GossipSettings(1000, 10000);
--      String cluster = UUID.randomUUID().toString();
--      
--      log.info( "Adding seed nodes" );
--      int seedNodes = 3;
--      List<GossipMember> startupMembers = new ArrayList<>();
--      for (int i = 1; i < seedNodes + 1; ++i) {
--        URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
--        startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
--      }
++  public void DeadNodesDoNotComeAliveAgain()
++          throws InterruptedException, UnknownHostException, 
URISyntaxException {
++    GossipSettings settings = new GossipSettings(1000, 10000);
++    String cluster = UUID.randomUUID().toString();
  
--      log.info( "Adding clients" );
--      final List<GossipService> clients = new ArrayList<>();
--      final int clusterMembers = 5;
--      for (int i = 1; i < clusterMembers+1; ++i) {
--          final int j = i;
--          URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
--          GossipService gossipService = new GossipService(cluster, uri, i + 
"",
--                  startupMembers, settings,
--                  new GossipListener(){
--                      @Override
--                      public void gossipEvent(GossipMember member, 
GossipState state) {
--                          System.out.println(System.currentTimeMillis() + " 
Member "+j + " reports "+ member+" "+ state);
--                      }
--                  });
--          clients.add(gossipService);
--          gossipService.start();
--      }
--      TUnit.assertThat(new Callable<Integer> (){
--          public Integer call() throws Exception {
--              int total = 0;
--              for (int i = 0; i < clusterMembers; ++i) {
--                  total += 
clients.get(i).get_gossipManager().getLiveMembers().size();
--              }
--              return total;
--          }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
++    log.info("Adding seed nodes");
++    int seedNodes = 3;
++    List<GossipMember> startupMembers = new ArrayList<>();
++    for (int i = 1; i < seedNodes + 1; ++i) {
++      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
++      startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
++    }
  
--      // shutdown one client and verify that one client is lost.
--      Random r = new Random();
--      int randomClientId = r.nextInt(clusterMembers);
--      log.info( "shutting down " + randomClientId );
--      final int shutdownPort = 
clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
--      final String shutdownId = 
clients.get(randomClientId).get_gossipManager().getMyself().getId();
--      clients.get(randomClientId).shutdown();
--      TUnit.assertThat(new Callable<Integer> (){
--          public Integer call() throws Exception {
--              int total = 0;
--              for (int i = 0; i < clusterMembers; ++i) {
--                  total += 
clients.get(i).get_gossipManager().getLiveMembers().size();
--              }
--              return total;
--          }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
--      clients.remove(randomClientId);
--      
--      TUnit.assertThat(new Callable<Integer> (){
--        public Integer call() throws Exception {
--            int total = 0;
--            for (int i = 0; i < clusterMembers - 1; ++i) {
--                total += 
clients.get(i).get_gossipManager().getDeadList().size();
--            }
--            return total;
--        }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
--      
--      URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
--      // start client again
--      GossipService gossipService = new GossipService(cluster, uri, 
shutdownId + "",
--              startupMembers, settings,
--              new GossipListener(){
--                  @Override
--                  public void gossipEvent(GossipMember member, GossipState 
state) {
--                      //System.out.println("revived " + member+" "+ state);
--                  }
++    log.info("Adding clients");
++    final List<GossipService> clients = new ArrayList<>();
++    final int clusterMembers = 5;
++    for (int i = 1; i < clusterMembers + 1; ++i) {
++      final int j = i;
++      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
++      GossipService gossipService = new GossipService(cluster, uri, i + "", 
startupMembers,
++              settings, new GossipListener() {
++                @Override
++                public void gossipEvent(GossipMember member, GossipState 
state) {
++                  System.out.println(System.currentTimeMillis() + " Member " 
+ j + " reports "
++                          + member + " " + state);
++                }
                });
        clients.add(gossipService);
        gossipService.start();
++    }
++    TUnit.assertThat(new Callable<Integer>() {
++      public Integer call() throws Exception {
++        int total = 0;
++        for (int i = 0; i < clusterMembers; ++i) {
++          total += clients.get(i).get_gossipManager().getLiveMembers().size();
++        }
++        return total;
++      }
++    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
  
--      // verify that the client is alive again for every node
--      TUnit.assertThat(new Callable<Integer> (){
--          public Integer call() throws Exception {
--              int total = 0;
--              for (int i = 0; i < clusterMembers; ++i) {
--                  total += 
clients.get(i).get_gossipManager().getLiveMembers().size();
++    // shutdown one client and verify that one client is lost.
++    Random r = new Random();
++    int randomClientId = r.nextInt(clusterMembers);
++    log.info("shutting down " + randomClientId);
++    final int shutdownPort = 
clients.get(randomClientId).get_gossipManager().getMyself().getUri()
++            .getPort();
++    final String shutdownId = 
clients.get(randomClientId).get_gossipManager().getMyself().getId();
++    clients.get(randomClientId).shutdown();
++    TUnit.assertThat(new Callable<Integer>() {
++      public Integer call() throws Exception {
++        int total = 0;
++        for (int i = 0; i < clusterMembers; ++i) {
++          total += clients.get(i).get_gossipManager().getLiveMembers().size();
++        }
++        return total;
++      }
++    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
++    clients.remove(randomClientId);
++
++    TUnit.assertThat(new Callable<Integer>() {
++      public Integer call() throws Exception {
++        int total = 0;
++        for (int i = 0; i < clusterMembers - 1; ++i) {
++          total += clients.get(i).get_gossipManager().getDeadList().size();
++        }
++        return total;
++      }
++    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
++
++    URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
++    // start client again
++    GossipService gossipService = new GossipService(cluster, uri, shutdownId 
+ "", startupMembers,
++            settings, new GossipListener() {
++              @Override
++              public void gossipEvent(GossipMember member, GossipState state) 
{
++                // System.out.println("revived " + member+" "+ state);
                }
--              return total;
--          }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
--      
--      for (int i = 0; i < clusterMembers; ++i) {
--        clients.get(i).shutdown();
++            });
++    clients.add(gossipService);
++    gossipService.start();
++
++    // verify that the client is alive again for every node
++    TUnit.assertThat(new Callable<Integer>() {
++      public Integer call() throws Exception {
++        int total = 0;
++        for (int i = 0; i < clusterMembers; ++i) {
++          total += clients.get(i).get_gossipManager().getLiveMembers().size();
++        }
++        return total;
        }
++    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
++
++    for (int i = 0; i < clusterMembers; ++i) {
++      clients.get(i).shutdown();
++    }
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --cc src/test/java/org/apache/gossip/StartupSettingsTest.java
index ed069c3,9019ac1..3a52fc7
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@@ -21,6 -21,6 +21,8 @@@ import org.apache.log4j.Logger
  import org.json.JSONException;
  
  import io.teknek.tunit.TUnit;
++
++import org.junit.After;
  import org.junit.jupiter.api.Test;
  
  import java.io.File;
@@@ -41,6 -44,6 +46,7 @@@ public class StartupSettingsTest 
    private static final Logger log = Logger.getLogger( 
StartupSettingsTest.class );
    private static final String CLUSTER = UUID.randomUUID().toString();
  
++  
    @Test
    public void testUsingSettingsFile() throws IOException, 
InterruptedException, JSONException, URISyntaxException {
      File settingsFile = File.createTempFile("gossipTest",".json");

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --cc src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 350fc6f,c98b0d3..0faa968
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@@ -15,7 -15,7 +15,7 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
--package org.apache.gossip;
++package org.apache.gossip; 
  
  import io.teknek.tunit.TUnit;
  
@@@ -33,8 -33,9 +33,10 @@@ import org.apache.log4j.Logger
  
  import org.apache.gossip.event.GossipListener;
  import org.apache.gossip.event.GossipState;
++import org.junit.After;
  import org.junit.jupiter.api.Test;
  
+ @RunWith(JUnitPlatform.class)
  public class TenNodeThreeSeedTest {
    private static final Logger log = Logger.getLogger( 
TenNodeThreeSeedTest.class );
  

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
----------------------------------------------------------------------
diff --cc 
src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index 6c63516,ab3242c..875a7ab
--- 
a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ 
b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@@ -43,14 -45,14 +45,12 @@@ public class RandomGossipManagerBuilder
    public static class TestGossipListener implements GossipListener {
      @Override
      public void gossipEvent(GossipMember member, GossipState state) {
--      System.out.println("Got gossip event");
      }
    }
  
    public static class TestNotificationListener implements 
NotificationListener {
      @Override
      public void handleNotification(Notification notification, Object o) {
--      System.out.println("Got notification event");
      }
    }
  
@@@ -73,8 -75,8 +73,8 @@@
        expectThrows(IllegalArgumentException.class,() -> {
            
RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
        });
--
    }
++  
    @Test
    public void createMembersListIfNull() throws URISyntaxException {
      RandomGossipManager gossipManager = RandomGossipManager.newBuilder()


Reply via email to