Repository: incubator-gossip Updated Branches: refs/heads/master 78c9ae55a -> 428c0573f
GOSSIP-42 Use normal or expo distributions Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/428c0573 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/428c0573 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/428c0573 Branch: refs/heads/master Commit: 428c0573fb208a4094bf2fc87a6d0ab456014cde Parents: 78c9ae5 Author: Edward Capriolo <edlinuxg...@gmail.com> Authored: Tue Jan 17 22:59:07 2017 -0500 Committer: Edward Capriolo <edlinuxg...@gmail.com> Committed: Tue Jan 17 22:59:07 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipSettings.java | 16 ++++++++-- .../org/apache/gossip/LocalGossipMember.java | 10 +++--- .../java/org/apache/gossip/StartupSettings.java | 4 ++- .../apache/gossip/accrual/FailureDetector.java | 32 +++++++++----------- .../org/apache/gossip/manager/GossipCore.java | 3 +- .../apache/gossip/manager/GossipManager.java | 5 ++- .../org/apache/gossip/GossipMemberTest.java | 4 +-- .../org/apache/gossip/ShutdownDeadtimeTest.java | 2 +- .../org/apache/gossip/StartupSettingsTest.java | 1 + .../gossip/accrual/FailureDetectorTest.java | 17 +++++++++-- .../manager/RandomGossipManagerBuilderTest.java | 2 +- 11 files changed, 60 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java index 12037fd..36fabb6 100644 --- a/src/main/java/org/apache/gossip/GossipSettings.java +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -20,7 +20,6 @@ package org.apache.gossip; /** * In this object the settings used by the GossipService are held. * - * @author harmenw */ public class GossipSettings { @@ -39,6 +38,9 @@ public class GossipSettings { /** the threshold for the detector */ //private double convictThreshold = 2.606201185901408; private double convictThreshold = 4.5; + + private String distribution = "exponential"; + /** * Construct GossipSettings with default settings. */ @@ -53,12 +55,14 @@ public class GossipSettings { * @param cleanupInterval * The cleanup interval in ms. */ - public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples, double convictThreshold) { + public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, + int minimumSamples, double convictThreshold, String distribution) { this.gossipInterval = gossipInterval; this.cleanupInterval = cleanupInterval; this.windowSize = windowSize; this.minimumSamples = minimumSamples; this.convictThreshold = convictThreshold; + this.distribution = distribution; } /** @@ -127,5 +131,13 @@ public class GossipSettings { public void setGossipInterval(int gossipInterval) { this.gossipInterval = gossipInterval; } + + public String getDistribution() { + return distribution; + } + + public void setDistribution(String distribution) { + this.distribution = distribution; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/main/java/org/apache/gossip/LocalGossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java index df3bb47..83a13df 100644 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ b/src/main/java/org/apache/gossip/LocalGossipMember.java @@ -40,9 +40,9 @@ public class LocalGossipMember extends GossipMember { * The current heartbeat */ public LocalGossipMember(String clusterName, URI uri, String id, - long heartbeat, int windowSize, int minSamples) { - super(clusterName, uri, id, heartbeat); - detector = new FailureDetector(this, minSamples, windowSize); + long heartbeat, int windowSize, int minSamples, String distribution) { + super(clusterName, uri, id, heartbeat ); + detector = new FailureDetector(this, minSamples, windowSize, distribution); } public void recordHeartbeat(long now){ @@ -63,6 +63,4 @@ public class LocalGossipMember extends GossipMember { + clusterName + ", id=" + id + ", currentdetect=" + d +" ]"; } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/main/java/org/apache/gossip/StartupSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index 38ccbf3..de63c66 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -171,12 +171,14 @@ public class StartupSettings { int minSamples = jsonObject.get("minimum_samples").intValue(); double convictThreshold = jsonObject.get("convict_threshold").asDouble(); String cluster = jsonObject.get("cluster").textValue(); + String distribution = jsonObject.get("distribution").textValue(); if (cluster == null){ throw new IllegalArgumentException("cluster was null. It is required"); } URI uri2 = new URI(uri); StartupSettings settings = new StartupSettings(id, uri2, - new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples, convictThreshold), cluster); + new GossipSettings(gossipInterval, cleanupInterval, windowSize, + minSamples, convictThreshold, distribution), cluster); String configMembersDetails = "Config-members ["; JsonNode membersJSON = jsonObject.get("members"); Iterator<JsonNode> it = membersJSON.iterator(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/main/java/org/apache/gossip/accrual/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java index 296e79f..b6a12b6 100644 --- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java +++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -17,11 +17,9 @@ */ package org.apache.gossip.accrual; -import java.util.ArrayList; -import java.util.List; - import org.apache.commons.math.MathException; import org.apache.commons.math.distribution.ExponentialDistributionImpl; +import org.apache.commons.math.distribution.NormalDistributionImpl; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; import org.apache.gossip.LocalGossipMember; import org.apache.log4j.Logger; @@ -33,11 +31,13 @@ public class FailureDetector { private final long minimumSamples; private volatile long latestHeartbeatMs = -1; private final LocalGossipMember parent; + private final String distribution; - public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize){ + public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize, String distribution){ this.parent = parent; descriptiveStatistics = new DescriptiveStatistics(windowSize); this.minimumSamples = minimumSamples; + this.distribution = distribution; } /** @@ -60,26 +60,24 @@ public class FailureDetector { public Double computePhiMeasure(long now) { if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) { LOGGER.debug( - String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(), latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples)); + String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(), + latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples)); return null; } synchronized (descriptiveStatistics) { long delta = now - latestHeartbeatMs; try { - //double probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), descriptiveStatistics.getVariance()).cumulativeProbability(delta); - double probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); - //LOGGER.warn (parent.getId() + " worked "+ -1.0d * Math.log10(probability)); + double probability = 0.0; + if (distribution.equals("normal")){ + double variance = descriptiveStatistics.getVariance(); + probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), + variance == 0 ? 0.1 : variance).cumulativeProbability(delta); + } else { + probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); + } return -1.0d * Math.log10(probability); } catch (MathException | IllegalArgumentException e) { - //LOGGER.warn(parent.getId() + " Exception while computing phi", e); - //LOGGER.warn(descriptiveStatistics); - //LOGGER.warn(descriptiveStatistics.getMean()); - List<Double> x = new ArrayList<>(); - for (double z : descriptiveStatistics.getValues()){ - x.add(z); - } - //LOGGER.warn(x); - //LOGGER.warn(parent.getId() + " " + descriptiveStatistics); + e.printStackTrace(); throw new IllegalArgumentException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index de940c6..d315361 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -307,7 +307,8 @@ public class GossipCore { remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager.getSettings().getWindowSize(), - gossipManager.getSettings().getMinimumSamples()); + gossipManager.getSettings().getMinimumSamples(), + gossipManager.getSettings().getDistribution()); aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); if (result != null){ http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index bb8c7fa..fb7ec93 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -19,7 +19,6 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; import java.net.URI; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map.Entry; @@ -87,13 +86,13 @@ public abstract class GossipManager { clock = new SystemClock(); dataReaper = new DataReaper(gossipCore, clock); me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), - settings.getWindowSize(), settings.getMinimumSamples()); + settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), - clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples()); + clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); //TODO should members start in down state? members.put(member, GossipState.DOWN); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/test/java/org/apache/gossip/GossipMemberTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/GossipMemberTest.java b/src/test/java/org/apache/gossip/GossipMemberTest.java index c2560d5..e15259e 100644 --- a/src/test/java/org/apache/gossip/GossipMemberTest.java +++ b/src/test/java/org/apache/gossip/GossipMemberTest.java @@ -31,9 +31,9 @@ public class GossipMemberTest { @Test public void testHashCodeFromGossip40() throws URISyntaxException { Assert.assertNotEquals( - new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, 10, 5) + new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, 10, 5, "exponential") .hashCode(), - new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, 11, 6) + new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, 11, 6, "exponential") .hashCode()); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 7b66fbc..f5e34ba 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -46,7 +46,7 @@ public class ShutdownDeadtimeTest { @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0); + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0, "exponential"); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; List<GossipMember> startupMembers = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/test/java/org/apache/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index 61bd4f4..f0acabf 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -69,6 +69,7 @@ public class StartupSettingsTest { " \"minimum_samples\":5,\n" + " \"cleanup_interval\":10000,\n" + " \"convict_threshold\":2.6,\n" + + " \"distribution\":\"exponential\",\n" + " \"members\":[\n" + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + " ]\n" + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java index f9ff9ff..99cf9c8 100644 --- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java +++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java @@ -28,12 +28,24 @@ import org.junit.runner.RunWith; @RunWith(JUnitPlatform.class) public class FailureDetectorTest { + + @Test + public void aNormalTest(){ + int samples = 1; + int windowSize = 1000; + LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), + "", 0L, windowSize, samples, "normal"); + member.recordHeartbeat(5); + member.recordHeartbeat(10); + Assert.assertEquals(new Double(0.3010299956639812), member.detect(10)); + } @Test public void aTest(){ int samples = 1; int windowSize = 1000; - LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, windowSize, samples); + LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), + "", 0L, windowSize, samples, "exponential"); member.recordHeartbeat(5); member.recordHeartbeat(10); Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); @@ -52,7 +64,8 @@ public class FailureDetectorTest { public void sameHeartbeatTest(){ int samples = 1; int windowSize = 1000; - LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, windowSize, samples); + LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), + "", 0L, windowSize, samples, "exponential"); member.recordHeartbeat(5); member.recordHeartbeat(5); member.recordHeartbeat(5); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/428c0573/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index 0c5aa88..1ef3a5b 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -91,7 +91,7 @@ public class RandomGossipManagerBuilderTest { public void useMemberListIfProvided() throws URISyntaxException { LocalGossipMember member = new LocalGossipMember( "aCluster", new URI("udp://localhost:2000"), "aGossipMember", - System.nanoTime(), 1000, 1); + System.nanoTime(), 1000, 1, "exponential"); List<GossipMember> memberList = new ArrayList<>(); memberList.add(member); RandomGossipManager gossipManager = RandomGossipManager.newBuilder()