Repository: incubator-gossip Updated Branches: refs/heads/master daea6edb1 -> f35dddd8f
GOSSIP-25 Create reaper process to expire per-node data Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/f35dddd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/f35dddd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/f35dddd8 Branch: refs/heads/master Commit: f35dddd8f21e3e7f9549a742644dba5250374d29 Parents: daea6ed Author: Edward Capriolo <edlinuxg...@gmail.com> Authored: Fri Oct 7 02:03:43 2016 -0400 Committer: Edward Capriolo <edlinuxg...@gmail.com> Committed: Fri Oct 7 02:03:43 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipService.java | 30 +++++----- .../java/org/apache/gossip/manager/Clock.java | 8 +++ .../org/apache/gossip/manager/DataReaper.java | 58 ++++++++++++++++++++ .../org/apache/gossip/manager/GossipCore.java | 23 +++++--- .../apache/gossip/manager/GossipManager.java | 31 +++++++++-- .../gossip/manager/PassiveGossipThread.java | 8 +-- .../org/apache/gossip/manager/SystemClock.java | 15 +++++ src/test/java/org/apache/gossip/DataTest.java | 6 +- .../org/apache/gossip/ShutdownDeadtimeTest.java | 12 ++-- .../org/apache/gossip/StartupSettingsTest.java | 4 +- .../org/apache/gossip/TenNodeThreeSeedTest.java | 2 +- .../apache/gossip/manager/DataReaperTest.java | 55 +++++++++++++++++++ 12 files changed, 206 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/GossipService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index 6c02e2c..ab0da97 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -24,19 +24,18 @@ import java.util.List; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.random.RandomGossipManager; -import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.GossipDataMessage; import org.apache.log4j.Logger; /** * This object represents the service which is responsible for gossiping with other gossip members. * - * @author joshclemm, harmenw */ public class GossipService { public static final Logger LOGGER = Logger.getLogger(GossipService.class); - private GossipManager gossipManager; + private final GossipManager gossipManager; /** * Constructor with the default settings. @@ -71,7 +70,7 @@ public class GossipService { } public void start() { - LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri()); + LOGGER.debug("Starting: " + getGossipManager().getMyself().getUri()); gossipManager.init(); } @@ -79,25 +78,26 @@ public class GossipService { gossipManager.shutdown(); } - public GossipManager get_gossipManager() { + public GossipManager getGossipManager() { return gossipManager; } /** - * Gossip data to the entire cluster + * Gossip data in a namespace that is per-node { node-id { key->value } } * @param message */ - public void gossipData(GossipDataMessage message){ - gossipManager.gossipData(message); + public void gossipPerNodeData(GossipDataMessage message){ + gossipManager.gossipPerNodeData(message); } - - public GossipDataMessage findGossipData(String nodeId, String key){ - return this.get_gossipManager().findGossipData(nodeId, key); - } - - public void set_gossipManager(GossipManager _gossipManager) { - this.gossipManager = _gossipManager; + /** + * Retrieve per-node gossip data by key + * @param nodeId + * @param key + * @return return the value if found or null if not found or expired + */ + public GossipDataMessage findPerNodeData(String nodeId, String key){ + return getGossipManager().findGossipData(nodeId, key); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/Clock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/Clock.java b/src/main/java/org/apache/gossip/manager/Clock.java new file mode 100644 index 0000000..0e828f7 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/Clock.java @@ -0,0 +1,8 @@ +package org.apache.gossip.manager; + +public interface Clock { + + long currentTimeMillis(); + long nanoTime(); + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/DataReaper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java new file mode 100644 index 0000000..237ffb6 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -0,0 +1,58 @@ +package org.apache.gossip.manager; + +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.model.GossipDataMessage; + +/** + * We wish to periodically sweep user data and remove entries past their timestamp. This + * implementation periodically sweeps through the data and removes old entries. While it might make + * sense to use a more specific high performance data-structure to handle eviction, keep in mind + * that we are not looking to store a large quantity of data as we currently have to transmit this + * data cluster wide. + */ +public class DataReaper { + + private final GossipCore gossipCore; + private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + private final Clock clock; + + public DataReaper(GossipCore gossipCore, Clock clock){ + this.gossipCore = gossipCore; + this.clock = clock; + } + + public void init(){ + Runnable reapPerNodeData = () -> { + runOnce(); + }; + scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS); + } + + void runOnce(){ + for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){ + reapData(node.getValue()); + } + } + + void reapData(ConcurrentHashMap<String, GossipDataMessage> concurrentHashMap){ + for (Entry<String, GossipDataMessage> entry : concurrentHashMap.entrySet()){ + if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ + concurrentHashMap.remove(entry.getKey(), entry.getValue()); + } + } + } + + public void close(){ + scheduledExecutor.shutdown(); + try { + scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 46d855a..08ec5b4 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -47,16 +47,21 @@ public class GossipCore { perNodeData = new ConcurrentHashMap<>(); } - /** - * - * @param message - */ + public void addPerNodeData(GossipDataMessage message){ - ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>(); - m.put(message.getKey(), message); - m = perNodeData.putIfAbsent(message.getNodeId(), m); - if (m != null){ - m.put(message.getKey(), message); //TODO only put if > ts + ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>(); + nodeMap.put(message.getKey(), message); + nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); + if (nodeMap != null){ + //m.put(message.getKey(), message); //TODO only put if > ts + GossipDataMessage current = nodeMap.get(message.getKey()); + if (current == null){ + nodeMap.replace(message.getKey(), null, message); + } else { + if (current.getTimestamp() < message.getTimestamp()){ + nodeMap.replace(message.getKey(), current, message); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 94b57d1..3c66208 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; @@ -67,14 +68,20 @@ public abstract class GossipManager implements NotificationListener { private ExecutorService gossipThreadExecutor; - private GossipCore gossipCore; + private final GossipCore gossipCore; + + private final DataReaper dataReaper; + + private final Clock clock; public GossipManager(String cluster, URI uri, String id, GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) { this.settings = settings; - this.gossipCore = new GossipCore(this); + gossipCore = new GossipCore(this); + clock = new SystemClock(); + dataReaper = new DataReaper(gossipCore, clock); me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this, settings.getCleanupInterval()); members = new ConcurrentSkipListMap<>(); @@ -192,6 +199,7 @@ public abstract class GossipManager implements NotificationListener { gossipThreadExecutor.execute(passiveGossipThread); activeGossipThread = new ActiveGossipThread(this, this.gossipCore); activeGossipThread.init(); + dataReaper.init(); GossipService.LOGGER.debug("The GossipService is started."); } @@ -202,6 +210,7 @@ public abstract class GossipManager implements NotificationListener { gossipServiceRunning.set(false); gossipThreadExecutor.shutdown(); gossipCore.shutdown(); + dataReaper.close(); if (passiveGossipThread != null) { passiveGossipThread.shutdown(); } @@ -218,7 +227,10 @@ public abstract class GossipManager implements NotificationListener { } } - public void gossipData(GossipDataMessage message){ + public void gossipPerNodeData(GossipDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); message.setNodeId(me.getId()); gossipCore.addPerNodeData(message); } @@ -228,8 +240,19 @@ public abstract class GossipManager implements NotificationListener { if (j == null){ return null; } else { - return j.get(key); + GossipDataMessage l = j.get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() != null && l.getExpireAt() < clock.currentTimeMillis()) { + return null; + } + return l; } } + + public DataReaper getDataReaper() { + return dataReaper; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 11c371e..f2dce0b 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -23,15 +23,11 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.gossip.GossipMember; -import org.apache.gossip.GossipService; import org.apache.gossip.model.Base; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; -import org.apache.gossip.RemoteGossipMember; /** * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, @@ -107,9 +103,9 @@ abstract public class PassiveGossipThread implements Runnable { } private void debug(int packetLength, byte[] jsonBytes) { - if (GossipService.LOGGER.isDebugEnabled()){ + if (LOGGER.isDebugEnabled()){ String receivedMessage = new String(jsonBytes); - GossipService.LOGGER.debug("Received message (" + packetLength + " bytes): " + LOGGER.debug("Received message (" + packetLength + " bytes): " + receivedMessage); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/SystemClock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/SystemClock.java b/src/main/java/org/apache/gossip/manager/SystemClock.java new file mode 100644 index 0000000..6d113b7 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/SystemClock.java @@ -0,0 +1,15 @@ +package org.apache.gossip.manager; + +public class SystemClock implements Clock { + + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + + @Override + public long nanoTime() { + return System.nanoTime(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 6260f9b..02c89a8 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -46,17 +46,17 @@ public class DataTest { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); + total += clients.get(i).getGossipManager().getLiveMembers().size(); } return total; }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); - clients.get(0).gossipData(msg()); + clients.get(0).gossipPerNodeData(msg()); Thread.sleep(10000); TUnit.assertThat( new Callable<Object> (){ public Object call() throws Exception { - GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a"); + GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a"); if (x == null) return ""; else return x.getPayload(); }}) http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 82cb625..383657d 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -78,7 +78,7 @@ public class ShutdownDeadtimeTest { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); + total += clients.get(i).getGossipManager().getLiveMembers().size(); } return total; } @@ -88,15 +88,15 @@ public class ShutdownDeadtimeTest { Random r = new Random(); int randomClientId = r.nextInt(clusterMembers); log.info("shutting down " + randomClientId); - final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri() + final int shutdownPort = clients.get(randomClientId).getGossipManager().getMyself().getUri() .getPort(); - final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); + final String shutdownId = clients.get(randomClientId).getGossipManager().getMyself().getId(); clients.get(randomClientId).shutdown(); TUnit.assertThat(new Callable<Integer>() { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); + total += clients.get(i).getGossipManager().getLiveMembers().size(); } return total; } @@ -107,7 +107,7 @@ public class ShutdownDeadtimeTest { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers - 1; ++i) { - total += clients.get(i).get_gossipManager().getDeadList().size(); + total += clients.get(i).getGossipManager().getDeadList().size(); } return total; } @@ -130,7 +130,7 @@ public class ShutdownDeadtimeTest { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); + total += clients.get(i).getGossipManager().getLiveMembers().size(); } return total; } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index 3a52fc7..3b62836 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -62,7 +62,7 @@ public class StartupSettingsTest { TUnit.assertThat(new Callable<Integer> (){ public Integer call() throws Exception { - return firstService.get_gossipManager().getLiveMembers().size(); + return firstService.getGossipManager().getLiveMembers().size(); }}).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(0); final GossipService serviceUnderTest = new GossipService( StartupSettings.fromJSONFile( settingsFile ) @@ -70,7 +70,7 @@ public class StartupSettingsTest { serviceUnderTest.start(); TUnit.assertThat(new Callable<Integer> (){ public Integer call() throws Exception { - return serviceUnderTest.get_gossipManager().getLiveMembers().size(); + return serviceUnderTest.getGossipManager().getLiveMembers().size(); }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(1); firstService.shutdown(); serviceUnderTest.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 0faa968..e72ec67 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -82,7 +82,7 @@ public class TenNodeThreeSeedTest { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); + total += clients.get(i).getGossipManager().getLiveMembers().size(); } return total; }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/manager/DataReaperTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java new file mode 100644 index 0000000..4cd5dfe --- /dev/null +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -0,0 +1,55 @@ +package org.apache.gossip.manager; + +import java.net.URI; + +import org.apache.gossip.GossipSettings; +import org.apache.gossip.manager.random.RandomGossipManager; +import org.apache.gossip.model.GossipDataMessage; +import org.junit.Assert; +import org.junit.Test; + +import io.teknek.tunit.TUnit; + +public class DataReaperTest { + + @Test + public void testReaperOneShot() { + String myId = "4"; + String key = "key"; + String value = "a"; + GossipSettings settings = new GossipSettings(); + GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) + .withId(myId).uri(URI.create("udp://localhost:5000")).build(); + gm.gossipPerNodeData(perNodeDatum(key, value)); + Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + gm.getDataReaper().runOnce(); + TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null); + } + + private GossipDataMessage perNodeDatum(String key, String value) { + GossipDataMessage m = new GossipDataMessage(); + m.setExpireAt(System.currentTimeMillis() + 5L); + m.setKey(key); + m.setPayload(value); + m.setTimestamp(System.currentTimeMillis()); + return m; + } + + @Test + public void testHigherTimestampWins() { + String myId = "4"; + String key = "key"; + String value = "a"; + GossipSettings settings = new GossipSettings(); + GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) + .withId(myId).uri(URI.create("udp://localhost:5000")).build(); + GossipDataMessage before = perNodeDatum(key, value); + GossipDataMessage after = perNodeDatum(key, "b"); + after.setTimestamp(after.getTimestamp() - 1); + gm.gossipPerNodeData(before); + Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + gm.gossipPerNodeData(after); + Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + } + +}