GOSSIP-21 Gossip user defined data
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/3d554f61 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/3d554f61 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/3d554f61 Branch: refs/heads/master Commit: 3d554f610e1236c98613bf5c34bffcb5a800faea Parents: 5c660b2 5590758 Author: Edward Capriolo <edlinuxg...@gmail.com> Authored: Sun Oct 2 15:02:45 2016 -0400 Committer: Edward Capriolo <edlinuxg...@gmail.com> Committed: Sun Oct 2 15:02:45 2016 -0400 ---------------------------------------------------------------------- pom.xml | 80 +++++---- .../java/org/apache/gossip/GossipService.java | 4 +- .../gossip/manager/ActiveGossipThread.java | 78 +++----- .../org/apache/gossip/manager/GossipCore.java | 14 +- .../apache/gossip/manager/GossipManager.java | 15 +- .../gossip/manager/PassiveGossipThread.java | 13 -- .../OnlyProcessReceivedPassiveGossipThread.java | 80 --------- .../apache/gossip/model/GossipDataMessage.java | 1 - src/test/java/org/apache/gossip/DataTest.java | 4 +- .../org/apache/gossip/ShutdownDeadtimeTest.java | 178 ++++++++++--------- .../org/apache/gossip/StartupSettingsTest.java | 6 + .../org/apache/gossip/TenNodeThreeSeedTest.java | 8 +- .../manager/RandomGossipManagerBuilderTest.java | 8 +- 13 files changed, 185 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/GossipService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java ---------------------------------------------------------------------- diff --cc src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 4790c09,b57c25a..19caffe --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@@ -20,22 -20,17 +20,23 @@@ package org.apache.gossip.manager import java.io.IOException; import java.net.DatagramSocket; import java.util.List; ++ +import java.util.Map.Entry; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.gossip.GossipService; + - import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.Response; import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpGossipDataMessage; ++ import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@@ -56,109 -51,41 +57,77 @@@ public class ActiveGossipThread public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; - this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); + random = new Random(); this.gossipCore = gossipCore; - this.random = new Random(); + this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); } -- - public void init(){ - Runnable liveGossip = new Runnable(){ - @Override - public void run() { - try { - sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); - } catch (RuntimeException ex){ - LOGGER.warn(ex); - } - } - }; - scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0, ++ + public void init() { + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - Runnable deadGossip = new Runnable(){ - @Override - public void run() { - try { - sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); - } catch (RuntimeException ex){ - LOGGER.warn(ex); - } - } - }; - scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0, + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - Runnable dataGossip = new Runnable(){ - @Override - public void run() { - try { - sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()); - } catch (RuntimeException ex){ - LOGGER.warn(ex); - } - } - }; - scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0, ++ scheduledExecutorService.scheduleAtFixedRate( ++ () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - } - + public void shutdown() { - this.scheduledExecutorService.shutdown(); + scheduledExecutorService.shutdown(); try { - this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.warn(e); - LOGGER.warn("Did not complete shutdown", e); ++ LOGGER.debug("Issue during shurdown" + e); } } + public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){ + LocalGossipMember member = selectPartner(memberList); + if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); ++ LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){ + UdpGossipDataMessage message = new UdpGossipDataMessage(); - System.out.println("sending message " + message); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { - //Response r = gossipCore.send(message, member.getUri()); + gossipCore.sendOneWay(message, member.getUri()); - //TODO: ack this message + } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" ++ LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + } + } catch (IOException e1) { - GossipService.LOGGER.warn(e1); ++ LOGGER.warn(e1); + } + } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { - protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { -- ++ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { me.setHeartbeat(System.currentTimeMillis()); LocalGossipMember member = selectPartner(memberList); if (member == null) { -- GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); ++ LOGGER.debug("Send sendMembershipList() is called without action"); return; } else { -- GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); ++ LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } try (DatagramSocket socket = new DatagramSocket()) { @@@ -180,33 -107,31 +149,30 @@@ LOGGER.warn("Message "+ message + " generated response "+ r); } } else { -- GossipService.LOGGER.error("The length of the to be send message is too large (" ++ LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); } } catch (IOException e1) { -- GossipService.LOGGER.warn(e1); ++ LOGGER.warn(e1); } } - ++ /** -- * Abstract method which should be implemented by a subclass. This method should return a member -- * of the list to gossip with. * * @param memberList * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - LOGGER.debug("I am alone in this world."); - } - return member; - } + protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { - GossipService.LOGGER.debug("I am alone in this world."); - ++ LOGGER.debug("I am alone in this world."); + } + return member; + } private GossipMember convert(LocalGossipMember member){ GossipMember gm = new GossipMember(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --cc src/main/java/org/apache/gossip/manager/GossipCore.java index 8bcba46,ab24621..46d855a --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@@ -44,20 -43,6 +44,24 @@@ public class GossipCore this.gossipManager = manager; requests = new ConcurrentHashMap<>(); service = Executors.newFixedThreadPool(500); + perNodeData = new ConcurrentHashMap<>(); + } + ++ /** ++ * ++ * @param message ++ */ + public void addPerNodeData(GossipDataMessage message){ + ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>(); + m.put(message.getKey(), message); + m = perNodeData.putIfAbsent(message.getNodeId(), m); + if (m != null){ + m.put(message.getKey(), message); //TODO only put if > ts + } + } + + public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){ + return perNodeData; } public void shutdown(){ @@@ -77,15 -61,6 +80,10 @@@ requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); } } + if (base instanceof GossipDataMessage) { + UdpGossipDataMessage message = (UdpGossipDataMessage) base; + addPerNodeData(message); - /* - UdpActiveGossipOk o = new UdpActiveGossipOk(); - o.setUriFrom(message.getUriFrom()); - o.setUuid(message.getUuid()); - sendOneWay(o, senderMember.getUri());*/ + } if (base instanceof ActiveGossipMessage){ List<GossipMember> remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; @@@ -178,11 -153,11 +176,11 @@@ } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { -- LOGGER.error(e.getMessage(), e); ++ LOGGER.debug(e.getMessage(), e); return null; } catch (TimeoutException e) { boolean cancelled = response.cancel(true); -- LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); ++ LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); return null; } finally { if (t != null){ http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --cc src/main/java/org/apache/gossip/manager/GossipManager.java index 36bc10a,0b2cfd2..94b57d1 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@@ -41,9 -40,8 +41,11 @@@ import org.apache.gossip.LocalGossipMem import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; + +import org.apache.gossip.model.GossipDataMessage; + - public abstract class GossipManager extends Thread implements NotificationListener { ++ + public abstract class GossipManager implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); @@@ -223,20 -213,4 +217,19 @@@ LOGGER.error(e); } } + + public void gossipData(GossipDataMessage message){ + message.setNodeId(me.getId()); + gossipCore.addPerNodeData(message); - System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData()); + } + + public GossipDataMessage findGossipData(String nodeId, String key){ + ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId); + if (j == null){ + return null; + } else { + return j.get(key); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --cc src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 6d440de,6d440de..11c371e --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@@ -121,17 -121,17 +121,4 @@@ abstract public class PassiveGossipThre } } -- /** -- * Abstract method for merging the local and remote list. -- * -- * @param gossipManager -- * The GossipManager for retrieving the local members and dead members list. -- * @param senderMember -- * The member who is sending this list, this could be used to send a response if the -- * remote list contains out-dated information. -- * @param remoteList -- * The list of members known at the remote side. -- */ -- abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, -- List<GossipMember> remoteList); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java ---------------------------------------------------------------------- diff --cc src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 0b51573,0b51573..dff5056 --- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@@ -17,11 -17,11 +17,6 @@@ */ package org.apache.gossip.manager.impl; --import java.util.List; -- --import org.apache.gossip.GossipMember; --import org.apache.gossip.LocalGossipMember; --import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.PassiveGossipThread; @@@ -35,79 -35,79 +30,4 @@@ public class OnlyProcessReceivedPassive super(gossipManager, gossipCore); } -- /** -- * Merge remote list (received from peer), and our local member list. Simply, we must update the -- * heartbeats that the remote list has with our list. Also, some additional logic is needed to -- * make sure we have not timed out a member and then immediately received a list with that member. -- * -- * @param gossipManager -- * @param senderMember -- * @param remoteList -- */ -- protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, -- List<GossipMember> remoteList) { -- -- // if the person sending to us is in the dead list consider them up -- for (LocalGossipMember i : gossipManager.getDeadList()) { -- if (i.getId().equals(senderMember.getId())) { -- LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); -- LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), -- senderMember.getUri(), senderMember.getId(), -- senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() -- .getCleanupInterval()); -- gossipManager.reviveMember(newLocalMember); -- newLocalMember.startTimeoutTimer(); -- } -- } -- for (GossipMember remoteMember : remoteList) { -- if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { -- continue; -- } -- if (gossipManager.getLiveMembers().contains(remoteMember)) { -- LocalGossipMember localMember = gossipManager.getLiveMembers().get( -- gossipManager.getLiveMembers().indexOf(remoteMember)); -- if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { -- localMember.setHeartbeat(remoteMember.getHeartbeat()); -- localMember.resetTimeoutTimer(); -- } -- } else if (!gossipManager.getLiveMembers().contains(remoteMember) -- && !gossipManager.getDeadList().contains(remoteMember)) { -- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), -- remoteMember.getUri(), remoteMember.getId(), -- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() -- .getCleanupInterval()); -- gossipManager.createOrReviveMember(newLocalMember); -- newLocalMember.startTimeoutTimer(); -- } else { -- if (gossipManager.getDeadList().contains(remoteMember)) { -- LocalGossipMember localDeadMember = gossipManager.getDeadList().get( -- gossipManager.getDeadList().indexOf(remoteMember)); -- if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { -- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), -- remoteMember.getUri(), remoteMember.getId(), -- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() -- .getCleanupInterval()); -- gossipManager.reviveMember(newLocalMember); -- newLocalMember.startTimeoutTimer(); -- LOGGER.debug("Removed remote member " + remoteMember.getAddress() -- + " from dead list and added to local member list."); -- } else { -- LOGGER.debug("me " + gossipManager.getMyself()); -- LOGGER.debug("sender " + senderMember); -- LOGGER.debug("remote " + remoteList); -- LOGGER.debug("live " + gossipManager.getLiveMembers()); -- LOGGER.debug("dead " + gossipManager.getDeadList()); -- } -- } else { -- LOGGER.debug("me " + gossipManager.getMyself()); -- LOGGER.debug("sender " + senderMember); -- LOGGER.debug("remote " + remoteList); -- LOGGER.debug("live " + gossipManager.getLiveMembers()); -- LOGGER.debug("dead " + gossipManager.getDeadList()); -- // throw new IllegalArgumentException("wtf"); -- } -- } -- } -- } -- } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/main/java/org/apache/gossip/model/GossipDataMessage.java ---------------------------------------------------------------------- diff --cc src/main/java/org/apache/gossip/model/GossipDataMessage.java index 2128dfe,0000000..835c668 mode 100644,000000..100644 --- a/src/main/java/org/apache/gossip/model/GossipDataMessage.java +++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java @@@ -1,50 -1,0 +1,49 @@@ +package org.apache.gossip.model; + +public class GossipDataMessage extends Base { + - + private String nodeId; + private String key; + private Object payload; + private Long timestamp; + private Long expireAt; + + public String getNodeId() { + return nodeId; + } + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public String getKey() { + return key; + } + public void setKey(String key) { + this.key = key; + } + public Object getPayload() { + return payload; + } + public void setPayload(Object payload) { + this.payload = payload; + } + public Long getTimestamp() { + return timestamp; + } + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + public Long getExpireAt() { + return expireAt; + } + public void setExpireAt(Long expireAt) { + this.expireAt = expireAt; + } + @Override + public String toString() { + return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --cc src/test/java/org/apache/gossip/DataTest.java index 3f42eeb,0000000..6260f9b mode 100644,000000..100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@@ -1,81 -1,0 +1,81 @@@ +package org.apache.gossip; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.GossipDataMessage; +import org.junit.Test; + +import io.teknek.tunit.TUnit; + +public class DataTest { - ++ + @Test + public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ + GossipSettings settings = new GossipSettings(); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List<GossipMember> startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + } + final List<GossipService> clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", + startupMembers, settings, + new GossipListener(){ + public void gossipEvent(GossipMember member, GossipState state) { - System.out.println(member + " " + state); ++ + } + }); + clients.add(gossipService); + gossipService.start(); + } + TUnit.assertThat(new Callable<Integer> (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + clients.get(0).gossipData(msg()); + Thread.sleep(10000); + TUnit.assertThat( + + new Callable<Object> (){ + public Object call() throws Exception { + GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a"); + if (x == null) return ""; + else return x.getPayload(); + }}) + + + //() -> clients.get(1).findGossipData(1+"" , "a").getPayload()) + .afterWaitingAtMost(20, TimeUnit.SECONDS) + .isEqualTo("b"); + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } + } + + private GossipDataMessage msg(){ + GossipDataMessage g = new GossipDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("b"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --cc src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 251550b,f0d7f10..82cb625 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@@ -29,107 -29,111 +29,115 @@@ import java.util.UUID import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -- import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; + import org.junit.platform.runner.JUnitPlatform; import org.junit.jupiter.api.Test; + import org.junit.runner.RunWith; + + @RunWith(JUnitPlatform.class) public class ShutdownDeadtimeTest { - private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); + private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); - ++ @Test - //@Ignore -- public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { -- GossipSettings settings = new GossipSettings(1000, 10000); -- String cluster = UUID.randomUUID().toString(); -- -- log.info( "Adding seed nodes" ); -- int seedNodes = 3; -- List<GossipMember> startupMembers = new ArrayList<>(); -- for (int i = 1; i < seedNodes + 1; ++i) { -- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); -- startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); -- } ++ public void DeadNodesDoNotComeAliveAgain() ++ throws InterruptedException, UnknownHostException, URISyntaxException { ++ GossipSettings settings = new GossipSettings(1000, 10000); ++ String cluster = UUID.randomUUID().toString(); -- log.info( "Adding clients" ); -- final List<GossipService> clients = new ArrayList<>(); -- final int clusterMembers = 5; -- for (int i = 1; i < clusterMembers+1; ++i) { -- final int j = i; -- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); -- GossipService gossipService = new GossipService(cluster, uri, i + "", -- startupMembers, settings, -- new GossipListener(){ -- @Override -- public void gossipEvent(GossipMember member, GossipState state) { -- System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state); -- } -- }); -- clients.add(gossipService); -- gossipService.start(); -- } -- TUnit.assertThat(new Callable<Integer> (){ -- public Integer call() throws Exception { -- int total = 0; -- for (int i = 0; i < clusterMembers; ++i) { -- total += clients.get(i).get_gossipManager().getLiveMembers().size(); -- } -- return total; -- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); ++ log.info("Adding seed nodes"); ++ int seedNodes = 3; ++ List<GossipMember> startupMembers = new ArrayList<>(); ++ for (int i = 1; i < seedNodes + 1; ++i) { ++ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); ++ startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); ++ } -- // shutdown one client and verify that one client is lost. -- Random r = new Random(); -- int randomClientId = r.nextInt(clusterMembers); -- log.info( "shutting down " + randomClientId ); -- final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort(); -- final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); -- clients.get(randomClientId).shutdown(); -- TUnit.assertThat(new Callable<Integer> (){ -- public Integer call() throws Exception { -- int total = 0; -- for (int i = 0; i < clusterMembers; ++i) { -- total += clients.get(i).get_gossipManager().getLiveMembers().size(); -- } -- return total; -- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); -- clients.remove(randomClientId); -- -- TUnit.assertThat(new Callable<Integer> (){ -- public Integer call() throws Exception { -- int total = 0; -- for (int i = 0; i < clusterMembers - 1; ++i) { -- total += clients.get(i).get_gossipManager().getDeadList().size(); -- } -- return total; -- }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); -- -- URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); -- // start client again -- GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", -- startupMembers, settings, -- new GossipListener(){ -- @Override -- public void gossipEvent(GossipMember member, GossipState state) { -- //System.out.println("revived " + member+" "+ state); -- } ++ log.info("Adding clients"); ++ final List<GossipService> clients = new ArrayList<>(); ++ final int clusterMembers = 5; ++ for (int i = 1; i < clusterMembers + 1; ++i) { ++ final int j = i; ++ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); ++ GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, ++ settings, new GossipListener() { ++ @Override ++ public void gossipEvent(GossipMember member, GossipState state) { ++ System.out.println(System.currentTimeMillis() + " Member " + j + " reports " ++ + member + " " + state); ++ } }); clients.add(gossipService); gossipService.start(); ++ } ++ TUnit.assertThat(new Callable<Integer>() { ++ public Integer call() throws Exception { ++ int total = 0; ++ for (int i = 0; i < clusterMembers; ++i) { ++ total += clients.get(i).get_gossipManager().getLiveMembers().size(); ++ } ++ return total; ++ } ++ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); -- // verify that the client is alive again for every node -- TUnit.assertThat(new Callable<Integer> (){ -- public Integer call() throws Exception { -- int total = 0; -- for (int i = 0; i < clusterMembers; ++i) { -- total += clients.get(i).get_gossipManager().getLiveMembers().size(); ++ // shutdown one client and verify that one client is lost. ++ Random r = new Random(); ++ int randomClientId = r.nextInt(clusterMembers); ++ log.info("shutting down " + randomClientId); ++ final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri() ++ .getPort(); ++ final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); ++ clients.get(randomClientId).shutdown(); ++ TUnit.assertThat(new Callable<Integer>() { ++ public Integer call() throws Exception { ++ int total = 0; ++ for (int i = 0; i < clusterMembers; ++i) { ++ total += clients.get(i).get_gossipManager().getLiveMembers().size(); ++ } ++ return total; ++ } ++ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); ++ clients.remove(randomClientId); ++ ++ TUnit.assertThat(new Callable<Integer>() { ++ public Integer call() throws Exception { ++ int total = 0; ++ for (int i = 0; i < clusterMembers - 1; ++i) { ++ total += clients.get(i).get_gossipManager().getDeadList().size(); ++ } ++ return total; ++ } ++ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); ++ ++ URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); ++ // start client again ++ GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, ++ settings, new GossipListener() { ++ @Override ++ public void gossipEvent(GossipMember member, GossipState state) { ++ // System.out.println("revived " + member+" "+ state); } -- return total; -- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); -- -- for (int i = 0; i < clusterMembers; ++i) { -- clients.get(i).shutdown(); ++ }); ++ clients.add(gossipService); ++ gossipService.start(); ++ ++ // verify that the client is alive again for every node ++ TUnit.assertThat(new Callable<Integer>() { ++ public Integer call() throws Exception { ++ int total = 0; ++ for (int i = 0; i < clusterMembers; ++i) { ++ total += clients.get(i).get_gossipManager().getLiveMembers().size(); ++ } ++ return total; } ++ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); ++ ++ for (int i = 0; i < clusterMembers; ++i) { ++ clients.get(i).shutdown(); ++ } } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --cc src/test/java/org/apache/gossip/StartupSettingsTest.java index ed069c3,9019ac1..3a52fc7 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@@ -21,6 -21,6 +21,8 @@@ import org.apache.log4j.Logger import org.json.JSONException; import io.teknek.tunit.TUnit; ++ ++import org.junit.After; import org.junit.jupiter.api.Test; import java.io.File; @@@ -41,6 -44,6 +46,7 @@@ public class StartupSettingsTest private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); private static final String CLUSTER = UUID.randomUUID().toString(); ++ @Test public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException { File settingsFile = File.createTempFile("gossipTest",".json"); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java ---------------------------------------------------------------------- diff --cc src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 350fc6f,c98b0d3..0faa968 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@@ -15,7 -15,7 +15,7 @@@ * See the License for the specific language governing permissions and * limitations under the License. */ --package org.apache.gossip; ++package org.apache.gossip; import io.teknek.tunit.TUnit; @@@ -33,8 -33,9 +33,10 @@@ import org.apache.log4j.Logger import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; ++import org.junit.After; import org.junit.jupiter.api.Test; + @RunWith(JUnitPlatform.class) public class TenNodeThreeSeedTest { private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class ); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3d554f61/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java ---------------------------------------------------------------------- diff --cc src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index 6c63516,ab3242c..875a7ab --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@@ -43,14 -45,14 +45,12 @@@ public class RandomGossipManagerBuilder public static class TestGossipListener implements GossipListener { @Override public void gossipEvent(GossipMember member, GossipState state) { -- System.out.println("Got gossip event"); } } public static class TestNotificationListener implements NotificationListener { @Override public void handleNotification(Notification notification, Object o) { -- System.out.println("Got notification event"); } } @@@ -73,8 -75,8 +73,8 @@@ expectThrows(IllegalArgumentException.class,() -> { RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build(); }); -- } ++ @Test public void createMembersListIfNull() throws URISyntaxException { RandomGossipManager gossipManager = RandomGossipManager.newBuilder()