GOSSIP-78 refactor into a multi-module maven project * add *.ipr to .gitignore * modify existing pom to be a parent. create new pom for gossip-core. * I left all properties and dependencies in the parent, as they seemed to be a fairly general set of dependencies. move all the code. * rename parent module: gossip -> gossip-parent * move dependencies into child module
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/298b1ae3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/298b1ae3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/298b1ae3 Branch: refs/heads/master Commit: 298b1ae3aeba575f39c6854e8f8039a6284cd470 Parents: 6a4d50c Author: Gary Dusbabek <[email protected]> Authored: Thu Apr 13 08:20:06 2017 -0500 Committer: Gary Dusbabek <[email protected]> Committed: Thu Apr 13 10:12:23 2017 -0500 ---------------------------------------------------------------------- .gitignore | 1 + gossip-base/pom.xml | 91 +++++ .../java/org/apache/gossip/GossipSettings.java | 226 +++++++++++ .../java/org/apache/gossip/LocalMember.java | 71 ++++ .../src/main/java/org/apache/gossip/Member.java | 166 ++++++++ .../java/org/apache/gossip/RemoteMember.java | 47 +++ .../java/org/apache/gossip/StartupSettings.java | 207 ++++++++++ .../apache/gossip/accrual/FailureDetector.java | 80 ++++ .../main/java/org/apache/gossip/crdt/Crdt.java | 39 ++ .../apache/gossip/crdt/CrdtBiFunctionMerge.java | 55 +++ .../org/apache/gossip/crdt/CrdtCounter.java | 24 ++ .../java/org/apache/gossip/crdt/CrdtModule.java | 69 ++++ .../java/org/apache/gossip/crdt/CrdtSet.java | 26 ++ .../org/apache/gossip/crdt/GrowOnlyCounter.java | 119 ++++++ .../org/apache/gossip/crdt/GrowOnlySet.java | 157 ++++++++ .../main/java/org/apache/gossip/crdt/OrSet.java | 304 +++++++++++++++ .../org/apache/gossip/event/GossipListener.java | 24 ++ .../org/apache/gossip/event/GossipState.java | 28 ++ .../examples/StandAloneDatacenterAndRack.java | 62 +++ .../apache/gossip/examples/StandAloneNode.java | 47 +++ .../examples/StandAloneNodeCrdtOrSet.java | 115 ++++++ .../gossip/manager/AbstractActiveGossiper.java | 171 ++++++++ .../java/org/apache/gossip/manager/Clock.java | 25 ++ .../org/apache/gossip/manager/DataReaper.java | 85 ++++ .../DatacenterRackAwareActiveGossiper.java | 244 ++++++++++++ .../org/apache/gossip/manager/GossipCore.java | 387 +++++++++++++++++++ .../gossip/manager/GossipCoreConstants.java | 30 ++ .../apache/gossip/manager/GossipManager.java | 319 +++++++++++++++ .../gossip/manager/GossipManagerBuilder.java | 152 ++++++++ .../manager/GossipMemberStateRefresher.java | 121 ++++++ .../gossip/manager/PassiveGossipConstants.java | 23 ++ .../gossip/manager/PassiveGossipThread.java | 114 ++++++ .../gossip/manager/RingStatePersister.java | 76 ++++ .../gossip/manager/SimpleActiveGossipper.java | 110 ++++++ .../org/apache/gossip/manager/SystemClock.java | 32 ++ .../gossip/manager/UserDataPersister.java | 107 +++++ .../handlers/ActiveGossipMessageHandler.java | 74 ++++ .../manager/handlers/DefaultMessageInvoker.java | 40 ++ .../gossip/manager/handlers/MessageHandler.java | 26 ++ .../gossip/manager/handlers/MessageInvoker.java | 33 ++ .../handlers/MessageInvokerCombiner.java | 48 +++ .../handlers/PerNodeDataMessageHandler.java | 31 ++ .../manager/handlers/ResponseHandler.java | 33 ++ .../handlers/SharedDataMessageHandler.java | 31 ++ .../handlers/ShutdownMessageHandler.java | 38 ++ .../manager/handlers/SimpleMessageInvoker.java | 45 +++ .../OnlyProcessReceivedPassiveGossipThread.java | 33 ++ .../gossip/model/ActiveGossipMessage.java | 39 ++ .../org/apache/gossip/model/ActiveGossipOk.java | 22 ++ .../main/java/org/apache/gossip/model/Base.java | 49 +++ .../java/org/apache/gossip/model/Fault.java | 40 ++ .../java/org/apache/gossip/model/Member.java | 87 +++++ .../java/org/apache/gossip/model/Message.java | 22 ++ .../apache/gossip/model/NotAMemberFault.java | 29 ++ .../apache/gossip/model/PerNodeDataMessage.java | 66 ++++ .../java/org/apache/gossip/model/Response.java | 22 ++ .../apache/gossip/model/SharedDataMessage.java | 64 +++ .../apache/gossip/model/ShutdownMessage.java | 51 +++ .../org/apache/gossip/model/SignedPayload.java | 36 ++ .../java/org/apache/gossip/secure/KeyTool.java | 57 +++ .../java/org/apache/gossip/udp/Trackable.java | 30 ++ .../gossip/udp/UdpActiveGossipMessage.java | 49 +++ .../apache/gossip/udp/UdpActiveGossipOk.java | 44 +++ .../apache/gossip/udp/UdpNotAMemberFault.java | 46 +++ .../gossip/udp/UdpPerNodeDataMessage.java | 48 +++ .../apache/gossip/udp/UdpSharedDataMessage.java | 50 +++ gossip-base/src/main/resources/log4j.properties | 20 + .../apache/gossip/AbstractIntegrationBase.java | 50 +++ .../test/java/org/apache/gossip/DataTest.java | 238 ++++++++++++ .../org/apache/gossip/IdAndPropertyTest.java | 91 +++++ .../test/java/org/apache/gossip/MemberTest.java | 40 ++ .../org/apache/gossip/ShutdownDeadtimeTest.java | 146 +++++++ .../org/apache/gossip/SignedMessageTest.java | 135 +++++++ .../org/apache/gossip/StartupSettingsTest.java | 86 +++++ .../org/apache/gossip/TenNodeThreeSeedTest.java | 92 +++++ .../gossip/accrual/FailureDetectorTest.java | 113 ++++++ .../apache/gossip/crdt/GrowOnlyCounterTest.java | 54 +++ .../org/apache/gossip/crdt/GrowOnlySetTest.java | 38 ++ .../java/org/apache/gossip/crdt/OrSetTest.java | 115 ++++++ .../apache/gossip/manager/DataReaperTest.java | 104 +++++ .../manager/GossipManagerBuilderTest.java | 121 ++++++ .../gossip/manager/RingPersistenceTest.java | 64 +++ .../gossip/manager/UserDataPersistenceTest.java | 114 ++++++ .../manager/handlers/MessageInvokerTest.java | 178 +++++++++ gossip-base/src/test/resources/log4j.properties | 20 + pom.xml | 207 ++++------ .../java/org/apache/gossip/GossipSettings.java | 226 ----------- .../java/org/apache/gossip/LocalMember.java | 71 ---- src/main/java/org/apache/gossip/Member.java | 166 -------- .../java/org/apache/gossip/RemoteMember.java | 47 --- .../java/org/apache/gossip/StartupSettings.java | 207 ---------- .../apache/gossip/accrual/FailureDetector.java | 80 ---- src/main/java/org/apache/gossip/crdt/Crdt.java | 39 -- .../apache/gossip/crdt/CrdtBiFunctionMerge.java | 55 --- .../org/apache/gossip/crdt/CrdtCounter.java | 24 -- .../java/org/apache/gossip/crdt/CrdtModule.java | 69 ---- .../java/org/apache/gossip/crdt/CrdtSet.java | 26 -- .../org/apache/gossip/crdt/GrowOnlyCounter.java | 119 ------ .../org/apache/gossip/crdt/GrowOnlySet.java | 157 -------- src/main/java/org/apache/gossip/crdt/OrSet.java | 304 --------------- .../org/apache/gossip/event/GossipListener.java | 24 -- .../org/apache/gossip/event/GossipState.java | 28 -- .../examples/StandAloneDatacenterAndRack.java | 62 --- .../apache/gossip/examples/StandAloneNode.java | 47 --- .../examples/StandAloneNodeCrdtOrSet.java | 115 ------ .../gossip/manager/AbstractActiveGossiper.java | 171 -------- .../java/org/apache/gossip/manager/Clock.java | 25 -- .../org/apache/gossip/manager/DataReaper.java | 85 ---- .../DatacenterRackAwareActiveGossiper.java | 244 ------------ .../org/apache/gossip/manager/GossipCore.java | 387 ------------------- .../gossip/manager/GossipCoreConstants.java | 30 -- .../apache/gossip/manager/GossipManager.java | 319 --------------- .../gossip/manager/GossipManagerBuilder.java | 152 -------- .../manager/GossipMemberStateRefresher.java | 121 ------ .../gossip/manager/PassiveGossipConstants.java | 23 -- .../gossip/manager/PassiveGossipThread.java | 114 ------ .../gossip/manager/RingStatePersister.java | 76 ---- .../gossip/manager/SimpleActiveGossipper.java | 110 ------ .../org/apache/gossip/manager/SystemClock.java | 32 -- .../gossip/manager/UserDataPersister.java | 107 ----- .../handlers/ActiveGossipMessageHandler.java | 74 ---- .../manager/handlers/DefaultMessageInvoker.java | 40 -- .../gossip/manager/handlers/MessageHandler.java | 26 -- .../gossip/manager/handlers/MessageInvoker.java | 33 -- .../handlers/MessageInvokerCombiner.java | 48 --- .../handlers/PerNodeDataMessageHandler.java | 31 -- .../manager/handlers/ResponseHandler.java | 33 -- .../handlers/SharedDataMessageHandler.java | 31 -- .../handlers/ShutdownMessageHandler.java | 38 -- .../manager/handlers/SimpleMessageInvoker.java | 45 --- .../OnlyProcessReceivedPassiveGossipThread.java | 33 -- .../gossip/model/ActiveGossipMessage.java | 39 -- .../org/apache/gossip/model/ActiveGossipOk.java | 22 -- src/main/java/org/apache/gossip/model/Base.java | 49 --- .../java/org/apache/gossip/model/Fault.java | 40 -- .../java/org/apache/gossip/model/Member.java | 87 ----- .../java/org/apache/gossip/model/Message.java | 22 -- .../apache/gossip/model/NotAMemberFault.java | 29 -- .../apache/gossip/model/PerNodeDataMessage.java | 66 ---- .../java/org/apache/gossip/model/Response.java | 22 -- .../apache/gossip/model/SharedDataMessage.java | 64 --- .../apache/gossip/model/ShutdownMessage.java | 51 --- .../org/apache/gossip/model/SignedPayload.java | 36 -- .../java/org/apache/gossip/secure/KeyTool.java | 57 --- .../java/org/apache/gossip/udp/Trackable.java | 30 -- .../gossip/udp/UdpActiveGossipMessage.java | 49 --- .../apache/gossip/udp/UdpActiveGossipOk.java | 44 --- .../apache/gossip/udp/UdpNotAMemberFault.java | 46 --- .../gossip/udp/UdpPerNodeDataMessage.java | 48 --- .../apache/gossip/udp/UdpSharedDataMessage.java | 50 --- src/main/resources/log4j.properties | 20 - .../apache/gossip/AbstractIntegrationBase.java | 50 --- src/test/java/org/apache/gossip/DataTest.java | 238 ------------ .../org/apache/gossip/IdAndPropertyTest.java | 91 ----- src/test/java/org/apache/gossip/MemberTest.java | 40 -- .../org/apache/gossip/ShutdownDeadtimeTest.java | 146 ------- .../org/apache/gossip/SignedMessageTest.java | 135 ------- .../org/apache/gossip/StartupSettingsTest.java | 86 ----- .../org/apache/gossip/TenNodeThreeSeedTest.java | 92 ----- .../gossip/accrual/FailureDetectorTest.java | 113 ------ .../apache/gossip/crdt/GrowOnlyCounterTest.java | 54 --- .../org/apache/gossip/crdt/GrowOnlySetTest.java | 38 -- .../java/org/apache/gossip/crdt/OrSetTest.java | 115 ------ .../apache/gossip/manager/DataReaperTest.java | 104 ----- .../manager/GossipManagerBuilderTest.java | 121 ------ .../gossip/manager/RingPersistenceTest.java | 64 --- .../gossip/manager/UserDataPersistenceTest.java | 114 ------ .../manager/handlers/MessageInvokerTest.java | 178 --------- src/test/resources/log4j.properties | 20 - 169 files changed, 7126 insertions(+), 7101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index fc368f3..fab2364 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ .idea/ *.iml *.iws +*.ipr # Mac .DS_Store http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml new file mode 100644 index 0000000..3529bd1 --- /dev/null +++ b/gossip-base/pom.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gossip</groupId> + <artifactId>gossip-parent</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <name>Gossip Base</name> + <artifactId>gossip-base</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>commons-math</groupId> + <artifactId>commons-math</artifactId> + <version>${commons-math.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version></dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + <version>${junit.vintage.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-runner</artifactId> + <version>${junit.platform.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.teknek</groupId> + <artifactId>tunit</artifactId> + <version>${tunit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j.version}</version> + <type>jar</type> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java new file mode 100644 index 0000000..6b2bf8b --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.util.HashMap; +import java.util.Map; + +/** + * In this object the settings used by the GossipService are held. + * + */ +public class GossipSettings { + + /** Time between gossip'ing in ms. Default is 1 second. */ + private int gossipInterval = 10; + + /** Time between cleanups in ms. Default is 10 seconds. */ + private int cleanupInterval = 5000; + + /** the minimum samples needed before reporting a result */ + private int minimumSamples = 5; + + /** the number of samples to keep per host */ + private int windowSize = 5000; + + /** the threshold for the detector */ + private double convictThreshold = 10; + + private String distribution = "normal"; + + private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; + + private Map<String,String> activeGossipProperties = new HashMap<>(); + + private String pathToRingState = "./"; + + private boolean persistRingState = true; + + private String pathToDataState = "./"; + + private boolean persistDataState = true; + + private String pathToKeyStore = "./keys"; + + private boolean signMessages = false; + + + /** + * Construct GossipSettings with default settings. + */ + public GossipSettings() { + } + + /** + * Construct GossipSettings with given settings. + * + * @param gossipInterval + * The gossip interval in ms. + * @param cleanupInterval + * The cleanup interval in ms. + */ + public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, + int minimumSamples, double convictThreshold, String distribution) { + this.gossipInterval = gossipInterval; + this.cleanupInterval = cleanupInterval; + this.windowSize = windowSize; + this.minimumSamples = minimumSamples; + this.convictThreshold = convictThreshold; + this.distribution = distribution; + } + + /** + * Set the gossip interval. This is the time between a gossip message is send. + * + * @param gossipInterval + * The gossip interval in ms. + */ + public void setGossipTimeout(int gossipInterval) { + this.gossipInterval = gossipInterval; + } + + /** + * Set the cleanup interval. This is the time between the last heartbeat received from a member + * and when it will be marked as dead. + * + * @param cleanupInterval + * The cleanup interval in ms. + */ + public void setCleanupInterval(int cleanupInterval) { + this.cleanupInterval = cleanupInterval; + } + + /** + * Get the gossip interval. + * + * @return The gossip interval in ms. + */ + public int getGossipInterval() { + return gossipInterval; + } + + /** + * Get the clean interval. + * + * @return The cleanup interval. + */ + public int getCleanupInterval() { + return cleanupInterval; + } + + public int getMinimumSamples() { + return minimumSamples; + } + + public void setMinimumSamples(int minimumSamples) { + this.minimumSamples = minimumSamples; + } + + public int getWindowSize() { + return windowSize; + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + + public double getConvictThreshold() { + return convictThreshold; + } + + public void setConvictThreshold(double convictThreshold) { + this.convictThreshold = convictThreshold; + } + + public void setGossipInterval(int gossipInterval) { + this.gossipInterval = gossipInterval; + } + + public String getDistribution() { + return distribution; + } + + public void setDistribution(String distribution) { + this.distribution = distribution; + } + + public String getActiveGossipClass() { + return activeGossipClass; + } + + public void setActiveGossipClass(String activeGossipClass) { + this.activeGossipClass = activeGossipClass; + } + + public Map<String, String> getActiveGossipProperties() { + return activeGossipProperties; + } + + public void setActiveGossipProperties(Map<String, String> activeGossipProperties) { + this.activeGossipProperties = activeGossipProperties; + } + + public String getPathToRingState() { + return pathToRingState; + } + + public void setPathToRingState(String pathToRingState) { + this.pathToRingState = pathToRingState; + } + + public boolean isPersistRingState() { + return persistRingState; + } + + public void setPersistRingState(boolean persistRingState) { + this.persistRingState = persistRingState; + } + + public String getPathToDataState() { + return pathToDataState; + } + + public void setPathToDataState(String pathToDataState) { + this.pathToDataState = pathToDataState; + } + + public boolean isPersistDataState() { + return persistDataState; + } + + public void setPersistDataState(boolean persistDataState) { + this.persistDataState = persistDataState; + } + + public String getPathToKeyStore() { + return pathToKeyStore; + } + + public void setPathToKeyStore(String pathToKeyStore) { + this.pathToKeyStore = pathToKeyStore; + } + + public boolean isSignMessages() { + return signMessages; + } + + public void setSignMessages(boolean signMessages) { + this.signMessages = signMessages; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/LocalMember.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/LocalMember.java b/gossip-base/src/main/java/org/apache/gossip/LocalMember.java new file mode 100644 index 0000000..450bce5 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/LocalMember.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.util.Map; + +import org.apache.gossip.accrual.FailureDetector; + +/** + * This object represent a gossip member with the properties known locally. These objects are stored + * in the local list of gossip members. + * + */ +public class LocalMember extends Member { + /** The failure detector for this member */ + private transient FailureDetector detector; + + /** + * + * @param uri + * The uri of the member + * @param id + * id of the node + * @param heartbeat + * The current heartbeat + */ + public LocalMember(String clusterName, URI uri, String id, + long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) { + super(clusterName, uri, id, heartbeat, properties ); + detector = new FailureDetector(minSamples, windowSize, distribution); + } + + protected LocalMember(){ + + } + + public void recordHeartbeat(long now){ + detector.recordHeartbeat(now); + } + + public Double detect(long now) { + return detector.computePhiMeasure(now); + } + + @Override + public String toString() { + Double d = null; + try { + d = detect(System.nanoTime()); + } catch (RuntimeException ex) {} + return "LocalGossipMember [uri=" + uri + ", heartbeat=" + heartbeat + ", clusterName=" + + clusterName + ", id=" + id + ", currentdetect=" + d +" ]"; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/Member.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/Member.java b/gossip-base/src/main/java/org/apache/gossip/Member.java new file mode 100644 index 0000000..d04a7b6 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/Member.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Map; + +/** + * A abstract class representing a gossip member. + * + */ +public abstract class Member implements Comparable<Member> { + + + protected URI uri; + + protected volatile long heartbeat; + + protected String clusterName; + + /** + * The purpose of the id field is to be able for nodes to identify themselves beyond their + * host/port. For example an application might generate a persistent id so if they rejoin the + * cluster at a different host and port we are aware it is the same node. + */ + protected String id; + + /* properties provided at startup time */ + protected Map<String,String> properties; + + /** + * Constructor. + * + * @param clusterName + * The name of the cluster + * @param uri + * A URI object containing IP/hostname and port + * @param heartbeat + * The current heartbeat + * @param id + * An id that may be replaced after contact + */ + public Member(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) { + this.clusterName = clusterName; + this.id = id; + this.heartbeat = heartbeat; + this.uri = uri; + this.properties = properties; + } + + protected Member(){} + /** + * Get the name of the cluster the member belongs to. + * + * @return The cluster name + */ + public String getClusterName() { + return clusterName; + } + + + /** + * @return The member address in the form IP/host:port Similar to the toString in + * {@link InetSocketAddress} + */ + public String computeAddress() { + return uri.getHost() + ":" + uri.getPort(); + } + + /** + * Get the heartbeat of this gossip member. + * + * @return The current heartbeat. + */ + public long getHeartbeat() { + return heartbeat; + } + + /** + * Set the heartbeat of this gossip member. + * + * @param heartbeat + * The new heartbeat. + */ + public void setHeartbeat(long heartbeat) { + this.heartbeat = heartbeat; + } + + public String getId() { + return id; + } + + public void setId(String _id) { + this.id = _id; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public String toString() { + return "Member [address=" + computeAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]"; + } + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + String address = computeAddress(); + result = prime * result + ((address == null) ? 0 : address.hashCode()) + (clusterName == null ? 0 + : clusterName.hashCode()); + return result; + } + + public URI getUri() { + return uri; + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + System.err.println("equals(): obj is null."); + return false; + } + if (!(obj instanceof Member)) { + System.err.println("equals(): obj is not of type GossipMember."); + return false; + } + // The object is the same of they both have the same address (hostname and port). + return computeAddress().equals(((LocalMember) obj).computeAddress()) + && getClusterName().equals(((LocalMember) obj).getClusterName()); + } + + public int compareTo(Member other) { + return this.computeAddress().compareTo(other.computeAddress()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/RemoteMember.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/RemoteMember.java b/gossip-base/src/main/java/org/apache/gossip/RemoteMember.java new file mode 100644 index 0000000..6b42da2 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/RemoteMember.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +/** + * The object represents a gossip member with the properties as received from a remote gossip + * member. + * + */ +public class RemoteMember extends Member { + + /** + * Constructor. + * + * @param uri + * A URI object containing IP/hostname and port + * @param heartbeat + * The current heartbeat + */ + public RemoteMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) { + super(clusterName, uri, id, heartbeat, properties); + } + + public RemoteMember(String clusterName, URI uri, String id) { + super(clusterName, uri, id, System.nanoTime(), new HashMap<String,String>()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java new file mode 100644 index 0000000..17eaaf2 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.log4j.Logger; + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * This object represents the settings used when starting the gossip service. + * + */ +public class StartupSettings { + private static final Logger log = Logger.getLogger(StartupSettings.class); + + /** The id to use fo the service */ + private String id; + + private URI uri; + + private String cluster; + + /** The gossip settings used at startup. */ + private final GossipSettings gossipSettings; + + /** The list with gossip members to start with. */ + private final List<Member> gossipMembers; + + /** + * Constructor. + * + * @param id + * The id to be used for this service + * @param uri + * A URI object containing IP/hostname and port + * @param logLevel + * unused + */ + public StartupSettings(String id, URI uri, int logLevel, String cluster) { + this(id, uri, new GossipSettings(), cluster); + } + + public URI getUri() { + return uri; + } + + public void setUri(URI uri) { + this.uri = uri; + } + + /** + * Constructor. + * + * @param id + * The id to be used for this service + * @param uri + * A URI object containing IP/hostname and port + */ + public StartupSettings(String id, URI uri, GossipSettings gossipSettings, String cluster) { + this.id = id; + this.uri = uri; + this.gossipSettings = gossipSettings; + this.setCluster(cluster); + gossipMembers = new ArrayList<>(); + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getCluster() { + return cluster; + } + + /** + * Set the id to be used for this service. + * + * @param id + * The id for this service. + */ + public void setId(String id) { + this.id = id; + } + + /** + * Get the id for this service. + * + * @return the service's id. + */ + public String getId() { + return id; + } + + /** + * Get the GossipSettings. + * + * @return The GossipSettings object. + */ + public GossipSettings getGossipSettings() { + return gossipSettings; + } + + /** + * Add a gossip member to the list of members to start with. + * + * @param member + * The member to add. + */ + public void addGossipMember(Member member) { + gossipMembers.add(member); + } + + /** + * Get the list with gossip members. + * + * @return The gossip members. + */ + public List<Member> getGossipMembers() { + return gossipMembers; + } + + /** + * Parse the settings for the gossip service from a JSON file. + * + * @param jsonFile + * The file object which refers to the JSON config file. + * @return The StartupSettings object with the settings from the config file. + * @throws FileNotFoundException + * Thrown when the file cannot be found. + * @throws IOException + * Thrown when reading the file gives problems. + * @throws URISyntaxException + */ + public static StartupSettings fromJSONFile(File jsonFile) throws + FileNotFoundException, IOException, URISyntaxException { + ObjectMapper om = new ObjectMapper(); + JsonNode root = om.readTree(jsonFile); + JsonNode jsonObject = root.get(0); + String uri = jsonObject.get("uri").textValue(); + String id = jsonObject.get("id").textValue(); + Map<String,String> properties = new HashMap<String,String>(); + JsonNode n = jsonObject.get("properties"); + Iterator<Entry<String, JsonNode>> l = n.fields(); + while (l.hasNext()){ + Entry<String, JsonNode> i = l.next(); + properties.put(i.getKey(), i.getValue().asText()); + } + //TODO constants as defaults? + int gossipInterval = jsonObject.get("gossip_interval").intValue(); + int cleanupInterval = jsonObject.get("cleanup_interval").intValue(); + int windowSize = jsonObject.get("window_size").intValue(); + int minSamples = jsonObject.get("minimum_samples").intValue(); + double convictThreshold = jsonObject.get("convict_threshold").asDouble(); + String cluster = jsonObject.get("cluster").textValue(); + String distribution = jsonObject.get("distribution").textValue(); + if (cluster == null){ + throw new IllegalArgumentException("cluster was null. It is required"); + } + URI uri2 = new URI(uri); + StartupSettings settings = new StartupSettings(id, uri2, + new GossipSettings(gossipInterval, cleanupInterval, windowSize, + minSamples, convictThreshold, distribution), cluster); + String configMembersDetails = "Config-members ["; + JsonNode membersJSON = jsonObject.get("members"); + Iterator<JsonNode> it = membersJSON.iterator(); + while (it.hasNext()){ + JsonNode child = it.next(); + URI uri3 = new URI(child.get("uri").textValue()); + RemoteMember member = new RemoteMember(child.get("cluster").asText(), + uri3, "", 0, new HashMap<String,String>()); + settings.addGossipMember(member); + configMembersDetails += member.computeAddress(); + configMembersDetails += ", "; + } + log.info(configMembersDetails + "]"); + return settings; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/accrual/FailureDetector.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/gossip-base/src/main/java/org/apache/gossip/accrual/FailureDetector.java new file mode 100644 index 0000000..5abd5c6 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.accrual; + +import org.apache.commons.math.MathException; +import org.apache.commons.math.distribution.ExponentialDistributionImpl; +import org.apache.commons.math.distribution.NormalDistributionImpl; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.log4j.Logger; + +public class FailureDetector { + + public static final Logger LOGGER = Logger.getLogger(FailureDetector.class); + private final DescriptiveStatistics descriptiveStatistics; + private final long minimumSamples; + private volatile long latestHeartbeatMs = -1; + private final String distribution; + + public FailureDetector(long minimumSamples, int windowSize, String distribution) { + descriptiveStatistics = new DescriptiveStatistics(windowSize); + this.minimumSamples = minimumSamples; + this.distribution = distribution; + } + + /** + * Updates the statistics based on the delta between the last + * heartbeat and supplied time + * + * @param now the time of the heartbeat in milliseconds + */ + public synchronized void recordHeartbeat(long now) { + if (now <= latestHeartbeatMs) { + return; + } + if (latestHeartbeatMs != -1) { + descriptiveStatistics.addValue(now - latestHeartbeatMs); + } + latestHeartbeatMs = now; + } + + public synchronized Double computePhiMeasure(long now) { + if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) { + return null; + } + long delta = now - latestHeartbeatMs; + try { + double probability; + if (distribution.equals("normal")) { + double standardDeviation = descriptiveStatistics.getStandardDeviation(); + standardDeviation = standardDeviation < 0.1 ? 0.1 : standardDeviation; + probability = new NormalDistributionImpl(descriptiveStatistics.getMean(), standardDeviation).cumulativeProbability(delta); + } else { + probability = new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); + } + final double eps = 1e-12; + if (1 - probability < eps) { + probability = 1.0; + } + return -1.0d * Math.log10(1.0d - probability); + } catch (MathException | IllegalArgumentException e) { + LOGGER.debug(e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/Crdt.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/Crdt.java b/gossip-base/src/main/java/org/apache/gossip/crdt/Crdt.java new file mode 100644 index 0000000..8edfa8c --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/Crdt.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; +/** + * + * Immutable type + * + * @param <SetType> + * @param <MergeReturnType> + */ +public interface Crdt<SetType, MergeReturnType extends Crdt<SetType, MergeReturnType>> { + + + MergeReturnType merge(MergeReturnType other); + SetType value(); + /** + * Called to self optimize. Some CRDTs may use some mechanism to clean up be + * removing obsolete data outside the scope of merging. IE this could clean up + * temporal values, old copies etc. + * @return the Crdt structure optimized + */ + MergeReturnType optimize(); + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java new file mode 100644 index 0000000..1ac7a30 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.function.BiFunction; + +@SuppressWarnings("rawtypes") +public class CrdtBiFunctionMerge implements BiFunction<Crdt,Crdt,Crdt> { + + @SuppressWarnings("unchecked") + @Override + public Crdt apply(Crdt t, Crdt u) { + if (t == null && u == null){ + return null; + } else if (t == null){ + return u; + } else if (u == null){ + return t; + } + if (! u.getClass().equals(t.getClass())){ + throw new IllegalArgumentException( "Can not merge " + t.getClass() + " "+ u.getClass()); + } + return t.merge(u); + } + + @SuppressWarnings("unchecked") + public static Crdt applyStatic(Crdt t, Crdt u){ + if (t == null && u == null){ + return null; + } else if (t == null){ + return u; + } else if (u == null){ + return t; + } + if (! u.getClass().equals(t.getClass())){ + throw new IllegalArgumentException( "Can not merge " + t.getClass() + " "+ u.getClass()); + } + return t.merge(u); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtCounter.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtCounter.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtCounter.java new file mode 100644 index 0000000..cdc9445 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtCounter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +public interface CrdtCounter<ValueType extends Number, R extends CrdtCounter<ValueType, R>> + extends Crdt<ValueType, R> { + +} + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java new file mode 100644 index 0000000..cfb3f47 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.module.SimpleModule; + +abstract class OrSetMixin<E> { + @JsonCreator + OrSetMixin(@JsonProperty("elements") Map<E, Set<UUID>> w, @JsonProperty("tombstones") Map<E, Set<UUID>> h) { } + @JsonProperty("elements") abstract Map<E, Set<UUID>> getElements(); + @JsonProperty("tombstones") abstract Map<E, Set<UUID>> getTombstones(); + @JsonIgnore abstract boolean isEmpty(); +} + +abstract class GrowOnlySetMixin<E>{ + @JsonCreator + GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ } + @JsonProperty("elements") abstract Set<E> getElements(); + @JsonIgnore abstract boolean isEmpty(); +} + +abstract class GrowOnlyCounterMixin { + @JsonCreator + GrowOnlyCounterMixin(@JsonProperty("counters") Map<String, Long> counters) { } + @JsonProperty("counters") abstract Map<String, Long> getCounters(); +} + +//If anyone wants to take a stab at this. please have at it +//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java +public class CrdtModule extends SimpleModule { + + private static final long serialVersionUID = 6134836523275023418L; + + public CrdtModule() { + super("CrdtModule", new Version(0, 0, 0, "0.0.0", "org.apache.gossip", "gossip")); + } + + @Override + public void setupModule(SetupContext context) { + context.setMixInAnnotations(OrSet.class, OrSetMixin.class); + context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); + context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtSet.java new file mode 100644 index 0000000..21b41da --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtSet.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Set; + +public interface CrdtSet<ElementType, SetType extends Set<ElementType>, R extends CrdtSet<ElementType, SetType, R>> +extends Crdt<SetType, R> { + +} + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java b/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java new file mode 100644 index 0000000..dd1505a --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.GossipManager; + +import java.util.HashMap; +import java.util.Map; + +public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> { + + private final Map<String, Long> counters = new HashMap<>(); + + GrowOnlyCounter(Map<String, Long> counters) { + this.counters.putAll(counters); + } + + public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) { + counters.putAll(growOnlyCounter.counters); + if (counters.containsKey(builder.myId)) { + Long newValue = counters.get(builder.myId) + builder.counter; + counters.replace(builder.myId, newValue); + } else { + counters.put(builder.myId, builder.counter); + } + } + + public GrowOnlyCounter(Builder builder) { + counters.put(builder.myId, builder.counter); + } + + public GrowOnlyCounter(GossipManager manager) { + counters.put(manager.getMyself().getId(), 0L); + } + + public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) { + counters.putAll(growOnlyCounter.counters); + for (Map.Entry<String, Long> entry : other.counters.entrySet()) { + String otherKey = entry.getKey(); + Long otherValue = entry.getValue(); + + if (counters.containsKey(otherKey)) { + Long newValue = Math.max(counters.get(otherKey), otherValue); + counters.replace(otherKey, newValue); + } else { + counters.put(otherKey, otherValue); + } + } + } + + @Override + public GrowOnlyCounter merge(GrowOnlyCounter other) { + return new GrowOnlyCounter(this, other); + } + + @Override + public Long value() { + Long globalCount = 0L; + for (Long increment : counters.values()) { + globalCount += increment; + } + return globalCount; + } + + @Override + public GrowOnlyCounter optimize() { + return new GrowOnlyCounter(counters); + } + + @Override + public boolean equals(Object obj) { + if (getClass() != obj.getClass()) + return false; + GrowOnlyCounter other = (GrowOnlyCounter) obj; + return value().longValue() == other.value().longValue(); + } + + @Override + public String toString() { + return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]"; + } + + Map<String, Long> getCounters() { + return counters; + } + + public static class Builder { + + private final String myId; + + private Long counter; + + public Builder(GossipManager gossipManager) { + myId = gossipManager.getMyself().getId(); + counter = 0L; + } + + public GrowOnlyCounter.Builder increment(Long count) { + counter += count; + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java new file mode 100644 index 0000000..9e2dd49 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; + +public class GrowOnlySet<ElementType> implements CrdtSet<ElementType, Set<ElementType>, GrowOnlySet<ElementType>>{ + + private final Set<ElementType> hidden = new LinkedHashSet<>(); + + @SuppressWarnings("unused") + /* + * Used by SerDe + */ + private GrowOnlySet(){ + + } + + public GrowOnlySet(Set<ElementType> c){ + hidden.addAll(c); + } + + public GrowOnlySet(Collection<ElementType> c){ + hidden.addAll(c); + } + + public GrowOnlySet(GrowOnlySet<ElementType> first, GrowOnlySet<ElementType> second){ + hidden.addAll(first.value()); + hidden.addAll(second.value()); + } + + @Override + public GrowOnlySet<ElementType> merge(GrowOnlySet<ElementType> other) { + return new GrowOnlySet<>(this, other); + } + + @Override + public Set<ElementType> value() { + Set<ElementType> copy = new LinkedHashSet<>(); + copy.addAll(hidden); + return Collections.unmodifiableSet(copy); + } + + @Override + public GrowOnlySet<ElementType> optimize() { + return new GrowOnlySet<>(hidden); + } + + public int size() { + return hidden.size(); + } + + public boolean isEmpty() { + return hidden.isEmpty(); + } + + public boolean contains(Object o) { + return hidden.contains(o); + } + + public Iterator<ElementType> iterator() { + Set<ElementType> copy = new HashSet<>(); + copy.addAll(hidden); + return copy.iterator(); + } + + public Object[] toArray() { + return hidden.toArray(); + } + + public <T> T[] toArray(T[] a) { + return hidden.toArray(a); + } + + public boolean add(ElementType e) { + throw new UnsupportedOperationException(); + } + + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + public boolean containsAll(Collection<?> c) { + return hidden.containsAll(c); + } + + public boolean addAll(Collection<? extends ElementType> c) { + throw new UnsupportedOperationException(); + } + + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return "GrowOnlySet [hidden=" + hidden + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hidden == null) ? 0 : hidden.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + @SuppressWarnings("rawtypes") + GrowOnlySet other = (GrowOnlySet) obj; + if (hidden == null) { + if (other.hidden != null) + return false; + } else if (!hidden.equals(other.hidden)) + return false; + return true; + } + + Set<ElementType> getElements(){ + return hidden; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java new file mode 100644 index 0000000..f84dbc7 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.*; +import java.util.Map.Entry; +import java.util.function.BiConsumer; + +import org.apache.gossip.crdt.OrSet.Builder.Operation; + +/* + * A immutable set + */ +public class OrSet<E> implements Crdt<Set<E>, OrSet<E>> { + + private final Map<E, Set<UUID>> elements = new HashMap<>(); + private final Map<E, Set<UUID>> tombstones = new HashMap<>(); + private final transient Set<E> val; + + public OrSet(){ + val = computeValue(); + } + + OrSet(Map<E, Set<UUID>> elements, Map<E, Set<UUID>> tombstones){ + this.elements.putAll(elements); + this.tombstones.putAll(tombstones); + val = computeValue(); + } + + @SafeVarargs + public OrSet(E ... elements){ + for (E e: elements){ + internalAdd(e); + } + val = computeValue(); + } + + public OrSet(Builder<E>builder){ + for (Builder<E>.OrSetElement<E> e: builder.elements){ + if (e.operation == Operation.ADD){ + internalAdd(e.element); + } else { + internalRemove(e.element); + } + } + val = computeValue(); + } + + /** + * This constructor is the way to remove elements from an existing set + * @param set + * @param builder + */ + public OrSet(OrSet<E> set, Builder<E> builder){ + elements.putAll(set.elements); + tombstones.putAll(set.tombstones); + for (Builder<E>.OrSetElement<E> e: builder.elements){ + if (e.operation == Operation.ADD){ + internalAdd(e.element); + } else { + internalRemove(e.element); + } + } + val = computeValue(); + } + + static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) { + if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) { + return null; + } + Set<UUID> res = new HashSet<>(a); + res.addAll(b); + return res; + } + + private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) { + if (value == null) { + return; + } + map.merge(key, value, OrSet::mergeSets); + } + + public OrSet(OrSet<E> left, OrSet<E> right){ + BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, other) -> { + for (Entry<E, Set<UUID>> l : other.entrySet()){ + internalSetMerge(items, l.getKey(), l.getValue()); + } + }; + + internalMerge.accept(elements, left.elements); + internalMerge.accept(elements, right.elements); + internalMerge.accept(tombstones, left.tombstones); + internalMerge.accept(tombstones, right.tombstones); + + val = computeValue(); + } + + public OrSet.Builder<E> builder(){ + return new OrSet.Builder<>(); + } + + @Override + public OrSet<E> merge(OrSet<E> other) { + return new OrSet<E>(this, other); + } + + private void internalAdd(E element) { + Set<UUID> toMerge = new HashSet<>(); + toMerge.add(UUID.randomUUID()); + internalSetMerge(elements, element, toMerge); + } + + private void internalRemove(E element){ + internalSetMerge(tombstones, element, elements.get(element)); + } + + /* + * Computes the live values by analyzing the elements and tombstones + */ + private Set<E> computeValue(){ + Set<E> values = new HashSet<>(); + for (Entry<E, Set<UUID>> entry: elements.entrySet()){ + Set<UUID> deleteIds = tombstones.get(entry.getKey()); + // if not all tokens for current element are in tombstones + if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) { + values.add(entry.getKey()); + } + } + return values; + } + + @Override + public Set<E> value() { + return val; + } + + @Override + public OrSet<E> optimize() { + return this; + } + + public static class Builder<E> { + public static enum Operation { + ADD, REMOVE + }; + + private class OrSetElement<EL> { + EL element; + Operation operation; + + private OrSetElement(EL element, Operation operation) { + this.element = element; + this.operation = operation; + } + } + + private List<OrSetElement<E>> elements = new ArrayList<>(); + + public Builder<E> add(E element) { + elements.add(new OrSetElement<E>(element, Operation.ADD)); + return this; + } + + public Builder<E> remove(E element) { + elements.add(new OrSetElement<E>(element, Operation.REMOVE)); + return this; + } + + public Builder<E> mutate(E element, Operation operation) { + elements.add(new OrSetElement<E>(element, operation)); + return this; + } + } + + + public int size() { + return value().size(); + } + + + public boolean isEmpty() { + return value().size() == 0; + } + + + public boolean contains(Object o) { + return value().contains(o); + } + + + public Iterator<E> iterator() { + Iterator<E> managed = value().iterator(); + return new Iterator<E>() { + + @Override + public void remove() { + throw new IllegalArgumentException(); + } + + @Override + public boolean hasNext() { + return managed.hasNext(); + } + + @Override + public E next() { + return managed.next(); + } + + }; + } + + public Object[] toArray() { + return value().toArray(); + } + + public <T> T[] toArray(T[] a) { + return value().toArray(a); + } + + public boolean add(E e) { + throw new IllegalArgumentException("Can not add"); + } + + + public boolean remove(Object o) { + throw new IllegalArgumentException(); + } + + public boolean containsAll(Collection<?> c) { + return this.value().containsAll(c); + } + + public boolean addAll(Collection<? extends E> c) { + throw new IllegalArgumentException(); + } + + public boolean retainAll(Collection<?> c) { + throw new IllegalArgumentException(); + } + + public boolean removeAll(Collection<?> c) { + throw new IllegalArgumentException(); + } + + public void clear() { + throw new IllegalArgumentException(); + } + + @Override + public String toString() { + return "OrSet [elements=" + elements + ", tombstones=" + tombstones + "]" ; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((value() == null) ? 0 : value().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + @SuppressWarnings("rawtypes") + OrSet other = (OrSet) obj; + if (elements == null) { + if (other.elements != null) + return false; + } else if (!value().equals(other.value())) + return false; + return true; + } + + Map<E, Set<UUID>> getElements() { + return elements; + } + + Map<E, Set<UUID>> getTombstones() { + return tombstones; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/event/GossipListener.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/event/GossipListener.java b/gossip-base/src/main/java/org/apache/gossip/event/GossipListener.java new file mode 100644 index 0000000..9b33dab --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/GossipListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event; + +import org.apache.gossip.Member; + +public interface GossipListener { + void gossipEvent(Member member, GossipState state); +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/event/GossipState.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/event/GossipState.java b/gossip-base/src/main/java/org/apache/gossip/event/GossipState.java new file mode 100644 index 0000000..3b76c9e --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/GossipState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event; + +public enum GossipState { + UP("up"), DOWN("down"); + @SuppressWarnings("unused") + private final String state; + + private GossipState(String state) { + this.state = state; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java new file mode 100644 index 0000000..497894c --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.examples; + +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; + +public class StandAloneDatacenterAndRack { + + public static void main (String [] args) throws UnknownHostException, InterruptedException { + GossipSettings s = new GossipSettings(); + s.setWindowSize(1000); + s.setGossipInterval(100); + s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + Map<String, String> gossipProps = new HashMap<>(); + gossipProps.put("sameRackGossipIntervalMs", "2000"); + gossipProps.put("differentDatacenterGossipIntervalMs", "10000"); + s.setActiveGossipProperties(gossipProps); + Map<String, String> props = new HashMap<>(); + props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]); + props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]); + GossipManager manager = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipSettings(s) + .gossipMembers(Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .properties(props) + .build(); + manager.init(); + while (true){ + System.out.println("Live: " + manager.getLiveMembers()); + System.out.println("Dead: " + manager.getDeadMembers()); + Thread.sleep(2000); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNode.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNode.java new file mode 100644 index 0000000..93421b1 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.examples; + +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Arrays; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; + +public class StandAloneNode { + public static void main (String [] args) throws UnknownHostException, InterruptedException{ + GossipSettings s = new GossipSettings(); + s.setWindowSize(1000); + s.setGossipInterval(100); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); + while (true){ + System.out.println("Live: " + gossipService.getLiveMembers()); + System.out.println("Dead: " + gossipService.getDeadMembers()); + Thread.sleep(2000); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java new file mode 100644 index 0000000..d78cf5e --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.examples; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Arrays; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.crdt.GrowOnlyCounter; +import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; + +public class StandAloneNodeCrdtOrSet { + public static void main (String [] args) throws InterruptedException, IOException{ + GossipSettings s = new GossipSettings(); + s.setWindowSize(1000); + s.setGossipInterval(100); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); + + new Thread(() -> { + while (true){ + System.out.println("Live: " + gossipService.getLiveMembers()); + System.out.println("Dead: " + gossipService.getDeadMembers()); + System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? "": + gossipService.findCrdt("abc").value())); + System.out.println("********** " + gossipService.findCrdt("abc")); + System.out.println("^^^^^^^^^^ " + (gossipService.findCrdt("def") == null ? "": + gossipService.findCrdt("def").value())); + System.out.println("$$$$$$$$$$ " + gossipService.findCrdt("def")); + try { + Thread.sleep(2000); + } catch (Exception e) {} + } + }).start(); + + String line = null; + try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){ + while ( (line = br.readLine()) != null){ + System.out.println(line); + char op = line.charAt(0); + String val = line.substring(2); + if (op == 'a'){ + addData(val, gossipService); + } else if (op == 'r') { + removeData(val, gossipService); + } else if (op == 'g'){ + gcount(val, gossipService); + } + } + } + } + + private static void gcount(String val, GossipManager gossipManager){ + GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def"); + Long l = Long.valueOf(val); + if (c == null){ + c = new GrowOnlyCounter(new GrowOnlyCounter.Builder(gossipManager).increment((l))); + } else { + c = new GrowOnlyCounter(c, new GrowOnlyCounter.Builder(gossipManager).increment((l))); + } + SharedDataMessage m = new SharedDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("def"); + m.setPayload(c); + m.setTimestamp(System.currentTimeMillis()); + gossipManager.merge(m); + } + + private static void removeData(String val, GossipManager gossipService){ + @SuppressWarnings("unchecked") + OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc"); + SharedDataMessage m = new SharedDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("abc"); + m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val))); + m.setTimestamp(System.currentTimeMillis()); + gossipService.merge(m); + } + + private static void addData(String val, GossipManager gossipService){ + SharedDataMessage m = new SharedDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("abc"); + m.setPayload(new OrSet<String>(val)); + m.setTimestamp(System.currentTimeMillis()); + gossipService.merge(m); + } +}
