GOSSIP-46 Refactor away GossipService cleaner better class names
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/51998219 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/51998219 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/51998219 Branch: refs/heads/master Commit: 5199821980e979f463103c1b1c87f56ba0fabb3d Parents: d5fe9f9 Author: Edward Capriolo <[email protected]> Authored: Sun Mar 5 15:44:54 2017 -0500 Committer: Edward Capriolo <[email protected]> Committed: Mon Mar 6 17:30:19 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipMember.java | 166 ------------------- .../java/org/apache/gossip/GossipService.java | 135 --------------- .../org/apache/gossip/LocalGossipMember.java | 71 -------- .../java/org/apache/gossip/LocalMember.java | 71 ++++++++ src/main/java/org/apache/gossip/Member.java | 166 +++++++++++++++++++ .../org/apache/gossip/RemoteGossipMember.java | 47 ------ .../java/org/apache/gossip/RemoteMember.java | 47 ++++++ .../java/org/apache/gossip/StartupSettings.java | 8 +- .../apache/gossip/accrual/FailureDetector.java | 6 +- .../org/apache/gossip/event/GossipListener.java | 4 +- .../apache/gossip/examples/GossipExample.java | 105 ------------ .../examples/StandAloneDatacenterAndRack.java | 26 +-- .../apache/gossip/examples/StandAloneNode.java | 23 +-- .../examples/StandAloneNodeCrdtOrSet.java | 47 +++--- .../gossip/manager/AbstractActiveGossiper.java | 40 ++--- .../org/apache/gossip/manager/DataReaper.java | 12 +- .../DatacenterRackAwareActiveGossiper.java | 24 +-- .../org/apache/gossip/manager/GossipCore.java | 48 +++--- .../apache/gossip/manager/GossipManager.java | 63 ++++--- .../gossip/manager/GossipManagerBuilder.java | 152 +++++++++++++++++ .../gossip/manager/RingStatePersister.java | 6 +- .../gossip/manager/SimpleActiveGossipper.java | 8 +- .../gossip/manager/UserDataPersister.java | 16 +- .../handlers/ActiveGossipMessageHandler.java | 10 +- .../manager/handlers/DefaultMessageInvoker.java | 4 +- .../handlers/GossipDataMessageHandler.java | 31 ---- .../handlers/PerNodeDataMessageHandler.java | 31 ++++ .../handlers/SharedDataMessageHandler.java | 31 ++++ .../SharedGossipDataMessageHandler.java | 31 ---- .../handlers/ShutdownMessageHandler.java | 4 +- .../manager/random/RandomGossipManager.java | 145 ---------------- .../gossip/model/ActiveGossipMessage.java | 6 +- src/main/java/org/apache/gossip/model/Base.java | 12 +- .../apache/gossip/model/GossipDataMessage.java | 66 -------- .../org/apache/gossip/model/GossipMember.java | 87 ---------- .../java/org/apache/gossip/model/Member.java | 87 ++++++++++ .../apache/gossip/model/PerNodeDataMessage.java | 66 ++++++++ .../apache/gossip/model/SharedDataMessage.java | 64 +++++++ .../gossip/model/SharedGossipDataMessage.java | 64 ------- .../apache/gossip/udp/UdpGossipDataMessage.java | 48 ------ .../gossip/udp/UdpPerNodeDataMessage.java | 48 ++++++ .../apache/gossip/udp/UdpSharedDataMessage.java | 50 ++++++ .../gossip/udp/UdpSharedGossipDataMessage.java | 50 ------ .../apache/gossip/AbstractIntegrationBase.java | 50 ++++++ src/test/java/org/apache/gossip/DataTest.java | 86 +++++----- .../org/apache/gossip/GossipMemberTest.java | 40 ----- .../org/apache/gossip/IdAndPropertyTest.java | 46 ++--- src/test/java/org/apache/gossip/MemberTest.java | 40 +++++ .../org/apache/gossip/ShutdownDeadtimeTest.java | 45 +++-- .../org/apache/gossip/SignedMessageTest.java | 55 +++--- .../org/apache/gossip/StartupSettingsTest.java | 25 +-- .../org/apache/gossip/TenNodeThreeSeedTest.java | 29 ++-- .../gossip/accrual/FailureDetectorTest.java | 8 +- .../java/org/apache/gossip/crdt/OrSetTest.java | 21 ++- .../apache/gossip/manager/DataReaperTest.java | 25 ++- .../manager/GossipManagerBuilderTest.java | 121 ++++++++++++++ .../manager/RandomGossipManagerBuilderTest.java | 122 -------------- .../gossip/manager/RingPersistenceTest.java | 37 +++-- .../gossip/manager/UserDataPersistenceTest.java | 61 +++---- .../manager/handlers/MessageInvokerTest.java | 4 +- 60 files changed, 1446 insertions(+), 1595 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/GossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java deleted file mode 100644 index 703ac55..0000000 --- a/src/main/java/org/apache/gossip/GossipMember.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.Map; - -/** - * A abstract class representing a gossip member. - * - */ -public abstract class GossipMember implements Comparable<GossipMember> { - - - protected URI uri; - - protected volatile long heartbeat; - - protected String clusterName; - - /** - * The purpose of the id field is to be able for nodes to identify themselves beyond their - * host/port. For example an application might generate a persistent id so if they rejoin the - * cluster at a different host and port we are aware it is the same node. - */ - protected String id; - - /* properties provided at startup time */ - protected Map<String,String> properties; - - /** - * Constructor. - * - * @param clusterName - * The name of the cluster - * @param uri - * A URI object containing IP/hostname and port - * @param heartbeat - * The current heartbeat - * @param id - * An id that may be replaced after contact - */ - public GossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) { - this.clusterName = clusterName; - this.id = id; - this.heartbeat = heartbeat; - this.uri = uri; - this.properties = properties; - } - - protected GossipMember(){} - /** - * Get the name of the cluster the member belongs to. - * - * @return The cluster name - */ - public String getClusterName() { - return clusterName; - } - - - /** - * @return The member address in the form IP/host:port Similar to the toString in - * {@link InetSocketAddress} - */ - public String computeAddress() { - return uri.getHost() + ":" + uri.getPort(); - } - - /** - * Get the heartbeat of this gossip member. - * - * @return The current heartbeat. - */ - public long getHeartbeat() { - return heartbeat; - } - - /** - * Set the heartbeat of this gossip member. - * - * @param heartbeat - * The new heartbeat. - */ - public void setHeartbeat(long heartbeat) { - this.heartbeat = heartbeat; - } - - public String getId() { - return id; - } - - public void setId(String _id) { - this.id = _id; - } - - public Map<String, String> getProperties() { - return properties; - } - - public void setProperties(Map<String, String> properties) { - this.properties = properties; - } - - public String toString() { - return "Member [address=" + computeAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]"; - } - - /** - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - String address = computeAddress(); - result = prime * result + ((address == null) ? 0 : address.hashCode()) + (clusterName == null ? 0 - : clusterName.hashCode()); - return result; - } - - public URI getUri() { - return uri; - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - System.err.println("equals(): obj is null."); - return false; - } - if (!(obj instanceof GossipMember)) { - System.err.println("equals(): obj is not of type GossipMember."); - return false; - } - // The object is the same of they both have the same address (hostname and port). - return computeAddress().equals(((LocalGossipMember) obj).computeAddress()) - && getClusterName().equals(((LocalGossipMember) obj).getClusterName()); - } - - public int compareTo(GossipMember other) { - return this.computeAddress().compareTo(other.computeAddress()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/GossipService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java deleted file mode 100644 index f216c33..0000000 --- a/src/main/java/org/apache/gossip/GossipService.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import com.codahale.metrics.MetricRegistry; -import java.net.URI; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.codahale.metrics.JmxReporter; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.random.RandomGossipManager; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; -import org.apache.log4j.Logger; - -/** - * This object represents the service which is responsible for gossiping with other gossip members. - * - */ -public class GossipService { - - public static final Logger LOGGER = Logger.getLogger(GossipService.class); - private final JmxReporter jmxReporter; - - private final GossipManager gossipManager; - - /** - * Constructor with the default settings. - * - * @throws InterruptedException - * @throws UnknownHostException - */ - public GossipService(StartupSettings startupSettings) throws InterruptedException, - UnknownHostException { - this(startupSettings.getCluster(), startupSettings.getUri() - , startupSettings.getId(), new HashMap<String,String> (),startupSettings.getGossipMembers(), - startupSettings.getGossipSettings(), null, new MetricRegistry()); - } - - /** - * Setup the client's lists, gossiping parameters, and parse the startup config file. - * - * @throws InterruptedException - * @throws UnknownHostException - */ - public GossipService(String cluster, URI uri, String id, Map<String,String> properties, - List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry) - throws InterruptedException, UnknownHostException { - jmxReporter = JmxReporter.forRegistry(registry).build(); - jmxReporter.start(); - gossipManager = RandomGossipManager.newBuilder() - .withId(id) - .cluster(cluster) - .uri(uri) - .settings(settings) - .gossipMembers(gossipMembers) - .listener(listener) - .registry(registry) - .properties(properties) - .build(); - } - - public void start() { - gossipManager.init(); - } - - public void shutdown() { - gossipManager.shutdown(); - } - - public GossipManager getGossipManager() { - return gossipManager; - } - - /** - * Gossip data in a namespace that is per-node { node-id { key, value } } - * @param message - * message to be gossip'ed across the cluster - */ - public void gossipPerNodeData(GossipDataMessage message){ - gossipManager.gossipPerNodeData(message); - } - - /** - * Retrieve per-node gossip data by key - * - * @param nodeId - * the id of the node that owns the data - * @param key - * the key in the per-node map to find the data - * @return the value if found or null if not found or expired - */ - public GossipDataMessage findPerNodeData(String nodeId, String key){ - return getGossipManager().findPerNodeGossipData(nodeId, key); - } - - /** - * - * @param message - * Shared data to gossip around the cluster - */ - public void gossipSharedData(SharedGossipDataMessage message){ - gossipManager.gossipSharedData(message); - } - - /** - * - * @param key - * the key to search for - * @return the value associated with given key - */ - public SharedGossipDataMessage findSharedData(String key){ - return getGossipManager().findSharedGossipData(key); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/LocalGossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java deleted file mode 100644 index 05874f5..0000000 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.net.URI; -import java.util.Map; - -import org.apache.gossip.accrual.FailureDetector; - -/** - * This object represent a gossip member with the properties known locally. These objects are stored - * in the local list of gossip members. - * - */ -public class LocalGossipMember extends GossipMember { - /** The failure detector for this member */ - private transient FailureDetector detector; - - /** - * - * @param uri - * The uri of the member - * @param id - * id of the node - * @param heartbeat - * The current heartbeat - */ - public LocalGossipMember(String clusterName, URI uri, String id, - long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) { - super(clusterName, uri, id, heartbeat, properties ); - detector = new FailureDetector(this, minSamples, windowSize, distribution); - } - - protected LocalGossipMember(){ - - } - - public void recordHeartbeat(long now){ - detector.recordHeartbeat(now); - } - - public Double detect(long now) { - return detector.computePhiMeasure(now); - } - - @Override - public String toString() { - Double d = null; - try { - d = detect(System.nanoTime()); - } catch (RuntimeException ex) {} - return "LocalGossipMember [uri=" + uri + ", heartbeat=" + heartbeat + ", clusterName=" - + clusterName + ", id=" + id + ", currentdetect=" + d +" ]"; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/LocalMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/LocalMember.java b/src/main/java/org/apache/gossip/LocalMember.java new file mode 100644 index 0000000..db6e3f7 --- /dev/null +++ b/src/main/java/org/apache/gossip/LocalMember.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.util.Map; + +import org.apache.gossip.accrual.FailureDetector; + +/** + * This object represent a gossip member with the properties known locally. These objects are stored + * in the local list of gossip members. + * + */ +public class LocalMember extends Member { + /** The failure detector for this member */ + private transient FailureDetector detector; + + /** + * + * @param uri + * The uri of the member + * @param id + * id of the node + * @param heartbeat + * The current heartbeat + */ + public LocalMember(String clusterName, URI uri, String id, + long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) { + super(clusterName, uri, id, heartbeat, properties ); + detector = new FailureDetector(this, minSamples, windowSize, distribution); + } + + protected LocalMember(){ + + } + + public void recordHeartbeat(long now){ + detector.recordHeartbeat(now); + } + + public Double detect(long now) { + return detector.computePhiMeasure(now); + } + + @Override + public String toString() { + Double d = null; + try { + d = detect(System.nanoTime()); + } catch (RuntimeException ex) {} + return "LocalGossipMember [uri=" + uri + ", heartbeat=" + heartbeat + ", clusterName=" + + clusterName + ", id=" + id + ", currentdetect=" + d +" ]"; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/Member.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/Member.java b/src/main/java/org/apache/gossip/Member.java new file mode 100644 index 0000000..d04a7b6 --- /dev/null +++ b/src/main/java/org/apache/gossip/Member.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Map; + +/** + * A abstract class representing a gossip member. + * + */ +public abstract class Member implements Comparable<Member> { + + + protected URI uri; + + protected volatile long heartbeat; + + protected String clusterName; + + /** + * The purpose of the id field is to be able for nodes to identify themselves beyond their + * host/port. For example an application might generate a persistent id so if they rejoin the + * cluster at a different host and port we are aware it is the same node. + */ + protected String id; + + /* properties provided at startup time */ + protected Map<String,String> properties; + + /** + * Constructor. + * + * @param clusterName + * The name of the cluster + * @param uri + * A URI object containing IP/hostname and port + * @param heartbeat + * The current heartbeat + * @param id + * An id that may be replaced after contact + */ + public Member(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) { + this.clusterName = clusterName; + this.id = id; + this.heartbeat = heartbeat; + this.uri = uri; + this.properties = properties; + } + + protected Member(){} + /** + * Get the name of the cluster the member belongs to. + * + * @return The cluster name + */ + public String getClusterName() { + return clusterName; + } + + + /** + * @return The member address in the form IP/host:port Similar to the toString in + * {@link InetSocketAddress} + */ + public String computeAddress() { + return uri.getHost() + ":" + uri.getPort(); + } + + /** + * Get the heartbeat of this gossip member. + * + * @return The current heartbeat. + */ + public long getHeartbeat() { + return heartbeat; + } + + /** + * Set the heartbeat of this gossip member. + * + * @param heartbeat + * The new heartbeat. + */ + public void setHeartbeat(long heartbeat) { + this.heartbeat = heartbeat; + } + + public String getId() { + return id; + } + + public void setId(String _id) { + this.id = _id; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public String toString() { + return "Member [address=" + computeAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]"; + } + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + String address = computeAddress(); + result = prime * result + ((address == null) ? 0 : address.hashCode()) + (clusterName == null ? 0 + : clusterName.hashCode()); + return result; + } + + public URI getUri() { + return uri; + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + System.err.println("equals(): obj is null."); + return false; + } + if (!(obj instanceof Member)) { + System.err.println("equals(): obj is not of type GossipMember."); + return false; + } + // The object is the same of they both have the same address (hostname and port). + return computeAddress().equals(((LocalMember) obj).computeAddress()) + && getClusterName().equals(((LocalMember) obj).getClusterName()); + } + + public int compareTo(Member other) { + return this.computeAddress().compareTo(other.computeAddress()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/RemoteGossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java deleted file mode 100644 index e3f6620..0000000 --- a/src/main/java/org/apache/gossip/RemoteGossipMember.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - -/** - * The object represents a gossip member with the properties as received from a remote gossip - * member. - * - */ -public class RemoteGossipMember extends GossipMember { - - /** - * Constructor. - * - * @param uri - * A URI object containing IP/hostname and port - * @param heartbeat - * The current heartbeat - */ - public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) { - super(clusterName, uri, id, heartbeat, properties); - } - - public RemoteGossipMember(String clusterName, URI uri, String id) { - super(clusterName, uri, id, System.nanoTime(), new HashMap<String,String>()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/RemoteMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/RemoteMember.java b/src/main/java/org/apache/gossip/RemoteMember.java new file mode 100644 index 0000000..6b42da2 --- /dev/null +++ b/src/main/java/org/apache/gossip/RemoteMember.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +/** + * The object represents a gossip member with the properties as received from a remote gossip + * member. + * + */ +public class RemoteMember extends Member { + + /** + * Constructor. + * + * @param uri + * A URI object containing IP/hostname and port + * @param heartbeat + * The current heartbeat + */ + public RemoteMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) { + super(clusterName, uri, id, heartbeat, properties); + } + + public RemoteMember(String clusterName, URI uri, String id) { + super(clusterName, uri, id, System.nanoTime(), new HashMap<String,String>()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/StartupSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index ab5f764..17eaaf2 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -53,7 +53,7 @@ public class StartupSettings { private final GossipSettings gossipSettings; /** The list with gossip members to start with. */ - private final List<GossipMember> gossipMembers; + private final List<Member> gossipMembers; /** * Constructor. @@ -135,7 +135,7 @@ public class StartupSettings { * @param member * The member to add. */ - public void addGossipMember(GossipMember member) { + public void addGossipMember(Member member) { gossipMembers.add(member); } @@ -144,7 +144,7 @@ public class StartupSettings { * * @return The gossip members. */ - public List<GossipMember> getGossipMembers() { + public List<Member> getGossipMembers() { return gossipMembers; } @@ -195,7 +195,7 @@ public class StartupSettings { while (it.hasNext()){ JsonNode child = it.next(); URI uri3 = new URI(child.get("uri").textValue()); - RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(), + RemoteMember member = new RemoteMember(child.get("cluster").asText(), uri3, "", 0, new HashMap<String,String>()); settings.addGossipMember(member); configMembersDetails += member.computeAddress(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/accrual/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java index 10d66a9..22e73db 100644 --- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java +++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -21,7 +21,7 @@ import org.apache.commons.math.MathException; import org.apache.commons.math.distribution.ExponentialDistributionImpl; import org.apache.commons.math.distribution.NormalDistributionImpl; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.log4j.Logger; public class FailureDetector { @@ -30,10 +30,10 @@ public class FailureDetector { private final DescriptiveStatistics descriptiveStatistics; private final long minimumSamples; private volatile long latestHeartbeatMs = -1; - private final LocalGossipMember parent; + private final LocalMember parent; private final String distribution; - public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize, String distribution){ + public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){ this.parent = parent; descriptiveStatistics = new DescriptiveStatistics(windowSize); this.minimumSamples = minimumSamples; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/event/GossipListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java index 2e882f6..9b33dab 100644 --- a/src/main/java/org/apache/gossip/event/GossipListener.java +++ b/src/main/java/org/apache/gossip/event/GossipListener.java @@ -17,8 +17,8 @@ */ package org.apache.gossip.event; -import org.apache.gossip.GossipMember; +import org.apache.gossip.Member; public interface GossipListener { - void gossipEvent(GossipMember member, GossipState state); + void gossipEvent(Member member, GossipState state); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/examples/GossipExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java deleted file mode 100644 index 8236d46..0000000 --- a/src/main/java/org/apache/gossip/examples/GossipExample.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.examples; - -import com.codahale.metrics.MetricRegistry; -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.gossip.GossipMember; -import org.apache.gossip.GossipService; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; - -/** - * This class is an example of how one could use the gossip service. Here we start multiple gossip - * clients on this host as specified in the config file. - * - * @author harmenw - */ -public class GossipExample extends Thread { - /** The number of clients to start. */ - private static final int NUMBER_OF_CLIENTS = 4; - - /** - * @param args - */ - public static void main(String[] args) { - new GossipExample(); - } - - /** - * Constructor. This will start the this thread. - */ - public GossipExample() { - start(); - } - - /** - * @see java.lang.Thread#run() - */ - public void run() { - try { - GossipSettings settings = new GossipSettings(); - List<GossipService> clients = new ArrayList<>(); - String myIpAddress = InetAddress.getLocalHost().getHostAddress(); - String cluster = "My Gossip Cluster"; - - // Create the gossip members and put them in a list and give them a port number starting with - // 2000. - List<GossipMember> startupMembers = new ArrayList<>(); - for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { - URI u; - try { - u = new URI("udp://" + myIpAddress + ":" + (2000 + i)); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - startupMembers.add(new RemoteGossipMember(cluster, u, "", 0, new HashMap<String,String>())); - } - - // Lets start the gossip clients. - // Start the clients, waiting cleaning-interval + 1 second between them which will show the - // dead list handling. - for (GossipMember member : startupMembers) { - GossipService gossipService = new GossipService(cluster, member.getUri(), "", new HashMap<String,String>(), - startupMembers, settings, null, new MetricRegistry()); - clients.add(gossipService); - gossipService.start(); - sleep(settings.getCleanupInterval() + 1000); - } - - // After starting all gossip clients, first wait 10 seconds and then shut them down. - sleep(10000); - System.err.println("Going to shutdown all services..."); - // Since they all run in the same virtual machine and share the same executor, if one is - // shutdown they will all stop. - clients.get(0).shutdown(); - - } catch (UnknownHostException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java index dfeabd7..357e316 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java @@ -24,12 +24,11 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; - -import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; public class StandAloneDatacenterAndRack { @@ -43,18 +42,21 @@ public class StandAloneDatacenterAndRack { gossipProps.put("sameRackGossipIntervalMs", "2000"); gossipProps.put("differentDatacenterGossipIntervalMs", "10000"); s.setActiveGossipProperties(gossipProps); - - Map<String, String> props = new HashMap<>(); props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]); props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], - props, Arrays.asList(new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), - s, (a, b) -> { }, new MetricRegistry()); - gossipService.start(); + GossipManager manager = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipSettings(s) + .gossipMembers(Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .properties(props) + .build(); + manager.init(); while (true){ - System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers()); - System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers()); + System.out.println("Live: " + manager.getLiveMembers()); + System.out.println("Dead: " + manager.getDeadMembers()); Thread.sleep(2000); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/examples/StandAloneNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java index 3564943..b38865e 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -17,15 +17,13 @@ */ package org.apache.gossip.examples; -import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.UnknownHostException; import java.util.Arrays; -import java.util.HashMap; - -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; public class StandAloneNode { public static void main (String [] args) throws UnknownHostException, InterruptedException{ @@ -33,12 +31,17 @@ public class StandAloneNode { s.setWindowSize(10); s.setConvictThreshold(1.0); s.setGossipInterval(10); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(), - Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); - gossipService.start(); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); while (true){ - System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers()); - System.out.println( "Dead: " + gossipService.getGossipManager().getDeadMembers()); + System.out.println("Live: " + gossipService.getLiveMembers()); + System.out.println("Dead: " + gossipService.getDeadMembers()); Thread.sleep(2000); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java index d1c1751..00f1279 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -17,20 +17,17 @@ */ package org.apache.gossip.examples; -import com.codahale.metrics.MetricRegistry; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Arrays; -import java.util.HashMap; - -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; import org.apache.gossip.crdt.OrSet; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; public class StandAloneNodeCrdtOrSet { public static void main (String [] args) throws InterruptedException, IOException{ @@ -38,17 +35,22 @@ public class StandAloneNodeCrdtOrSet { s.setWindowSize(10); s.setConvictThreshold(1.0); s.setGossipInterval(10); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(), - Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); - gossipService.start(); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); new Thread(() -> { while (true){ - System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers()); - System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers()); - System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "": - gossipService.getGossipManager().findCrdt("abc").value())); - System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc")); + System.out.println("Live: " + gossipService.getLiveMembers()); + System.out.println("Dead: " + gossipService.getDeadMembers()); + System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? "": + gossipService.findCrdt("abc").value())); + System.out.println("********** " + gossipService.findCrdt("abc")); try { Thread.sleep(2000); } catch (Exception e) {} @@ -70,22 +72,23 @@ public class StandAloneNodeCrdtOrSet { } } - private static void removeData(String val, GossipService gossipService){ - OrSet<String> s = (OrSet<String>) gossipService.getGossipManager().findCrdt("abc"); - SharedGossipDataMessage m = new SharedGossipDataMessage(); + private static void removeData(String val, GossipManager gossipService){ + @SuppressWarnings("unchecked") + OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc"); + SharedDataMessage m = new SharedDataMessage(); m.setExpireAt(Long.MAX_VALUE); m.setKey("abc"); m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val))); m.setTimestamp(System.currentTimeMillis()); - gossipService.getGossipManager().merge(m); + gossipService.merge(m); } - private static void addData(String val, GossipService gossipService){ - SharedGossipDataMessage m = new SharedGossipDataMessage(); + private static void addData(String val, GossipManager gossipService){ + SharedDataMessage m = new SharedDataMessage(); m.setExpireAt(Long.MAX_VALUE); m.setKey("abc"); m.setPayload(new OrSet<String>(val)); m.setTimestamp(System.currentTimeMillis()); - gossipService.getGossipManager().merge(m); + gossipService.merge(m); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java index 9fea30b..b73550e 100644 --- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -24,16 +24,16 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Member; import org.apache.gossip.model.Response; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.model.ShutdownMessage; import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.gossip.udp.UdpGossipDataMessage; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.apache.gossip.udp.UdpPerNodeDataMessage; +import org.apache.gossip.udp.UdpSharedDataMessage; import org.apache.log4j.Logger; import static com.codahale.metrics.MetricRegistry.name; @@ -69,7 +69,7 @@ public abstract class AbstractActiveGossiper { } - public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){ + public final void sendShutdownMessage(LocalMember me, LocalMember target){ if (target == null){ return; } @@ -79,13 +79,13 @@ public abstract class AbstractActiveGossiper { gossipCore.sendOneWay(m, target.getUri()); } - public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){ + public final void sendSharedData(LocalMember me, LocalMember member){ if (member == null){ return; } long startTime = System.currentTimeMillis(); - for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){ - UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); + for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){ + UdpSharedDataMessage message = new UdpSharedDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); message.setExpireAt(innerEntry.getValue().getExpireAt()); @@ -98,14 +98,14 @@ public abstract class AbstractActiveGossiper { sharedDataHistogram.update(System.currentTimeMillis() - startTime); } - public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){ + public final void sendPerNodeData(LocalMember me, LocalMember member){ if (member == null){ return; } long startTime = System.currentTimeMillis(); - for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){ - for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){ - UdpGossipDataMessage message = new UdpGossipDataMessage(); + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){ + UdpPerNodeDataMessage message = new UdpPerNodeDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); message.setExpireAt(innerEntry.getValue().getExpireAt()); @@ -122,7 +122,7 @@ public abstract class AbstractActiveGossiper { /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { + protected void sendMembershipList(LocalMember me, LocalMember member) { if (member == null){ return; } @@ -132,7 +132,7 @@ public abstract class AbstractActiveGossiper { message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); message.setUuid(UUID.randomUUID().toString()); message.getMembers().add(convert(me)); - for (LocalGossipMember other : gossipManager.getMembers().keySet()) { + for (LocalMember other : gossipManager.getMembers().keySet()) { message.getMembers().add(convert(other)); } Response r = gossipCore.send(message, member.getUri()); @@ -144,8 +144,8 @@ public abstract class AbstractActiveGossiper { sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); } - protected final GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); + protected final Member convert(LocalMember member){ + Member gm = new Member(); gm.setCluster(member.getClusterName()); gm.setHeartbeat(member.getHeartbeat()); gm.setUri(member.getUri().toASCIIString()); @@ -160,8 +160,8 @@ public abstract class AbstractActiveGossiper { * An immutable list * @return The chosen LocalGossipMember to gossip with. */ - protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { - LocalGossipMember member = null; + protected LocalMember selectPartner(List<LocalMember> memberList) { + LocalMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = random.nextInt(memberList.size()); member = memberList.get(randomNeighborIndex); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/DataReaper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java index f165239..8175a1b 100644 --- a/src/main/java/org/apache/gossip/manager/DataReaper.java +++ b/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -23,8 +23,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; /** * We wish to periodically sweep user data and remove entries past their timestamp. This @@ -53,7 +53,7 @@ public class DataReaper { } void runSharedOnce(){ - for (Entry<String, SharedGossipDataMessage> entry : gossipCore.getSharedData().entrySet()){ + for (Entry<String, SharedDataMessage> entry : gossipCore.getSharedData().entrySet()){ if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ gossipCore.getSharedData().remove(entry.getKey(), entry.getValue()); } @@ -61,13 +61,13 @@ public class DataReaper { } void runPerNodeOnce(){ - for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){ + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : gossipCore.getPerNodeData().entrySet()){ reapData(node.getValue()); } } - void reapData(ConcurrentHashMap<String, GossipDataMessage> concurrentHashMap){ - for (Entry<String, GossipDataMessage> entry : concurrentHashMap.entrySet()){ + void reapData(ConcurrentHashMap<String, PerNodeDataMessage> concurrentHashMap){ + for (Entry<String, PerNodeDataMessage> entry : concurrentHashMap.entrySet()){ if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ concurrentHashMap.remove(entry.getKey(), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java index c66e332..2f489a2 100644 --- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java @@ -27,7 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import com.codahale.metrics.MetricRegistry; @@ -130,14 +130,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers())); } - private List<LocalGossipMember> differentDataCenter(){ + private List<LocalMember> differentDataCenter(){ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); String rack = gossipManager.getMyself().getProperties().get(RACK); if (myDc == null|| rack == null){ return Collections.emptyList(); } - List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10); - for (LocalGossipMember i : gossipManager.getLiveMembers()){ + List<LocalMember> notMyDc = new ArrayList<LocalMember>(10); + for (LocalMember i : gossipManager.getLiveMembers()){ if (!myDc.equals(i.getProperties().get(DATACENTER))){ notMyDc.add(i); } @@ -145,14 +145,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { return notMyDc; } - private List<LocalGossipMember> sameDatacenterDifferentRack(){ + private List<LocalMember> sameDatacenterDifferentRack(){ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); String rack = gossipManager.getMyself().getProperties().get(RACK); if (myDc == null|| rack == null){ return Collections.emptyList(); } - List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10); - for (LocalGossipMember i : gossipManager.getLiveMembers()){ + List<LocalMember> notMyDc = new ArrayList<LocalMember>(10); + for (LocalMember i : gossipManager.getLiveMembers()){ if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){ notMyDc.add(i); } @@ -160,14 +160,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { return notMyDc; } - private List<LocalGossipMember> sameRackNodes(){ + private List<LocalMember> sameRackNodes(){ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); String rack = gossipManager.getMyself().getProperties().get(RACK); if (myDc == null|| rack == null){ return Collections.emptyList(); } - List<LocalGossipMember> sameDcAndRack = new ArrayList<LocalGossipMember>(10); - for (LocalGossipMember i : gossipManager.getLiveMembers()){ + List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10); + for (LocalMember i : gossipManager.getLiveMembers()){ if (myDc.equals(i.getProperties().get(DATACENTER)) && rack.equals(i.getProperties().get(RACK))){ sameDcAndRack.add(i); @@ -177,7 +177,7 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { } private void sendToSameRackMember() { - LocalGossipMember i = selectPartner(sameRackNodes()); + LocalMember i = selectPartner(sameRackNodes()); sendMembershipList(gossipManager.getMyself(), i); } @@ -235,7 +235,7 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { * sends an optimistic shutdown message to several clusters nodes */ protected void sendShutdownMessage(){ - List<LocalGossipMember> l = gossipManager.getLiveMembers(); + List<LocalMember> l = gossipManager.getLiveMembers(); int sendTo = l.size() < 3 ? 1 : l.size() / 3; for (int i = 0; i < sendTo; i++) { threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index a24b125..e3dcb21 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -20,9 +20,9 @@ package org.apache.gossip.manager; import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.GossipMember; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.Member; +import org.apache.gossip.LocalMember; +import org.apache.gossip.RemoteMember; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipState; import org.apache.gossip.model.*; @@ -49,8 +49,8 @@ public class GossipCore implements GossipCoreConstants { private final GossipManager gossipManager; private ConcurrentHashMap<String, Base> requests; private ThreadPoolExecutor service; - private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData; - private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData; + private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; + private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final BlockingQueue<Runnable> workQueue; private final PKCS8EncodedKeySpec privKeySpec; private final PrivateKey privKey; @@ -113,14 +113,14 @@ public class GossipCore implements GossipCoreConstants { } @SuppressWarnings({ "unchecked", "rawtypes" }) - public void addSharedData(SharedGossipDataMessage message) { + public void addSharedData(SharedDataMessage message) { while (true){ - SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); if (previous == null){ return; } if (message.getPayload() instanceof Crdt){ - SharedGossipDataMessage merged = new SharedGossipDataMessage(); + SharedDataMessage merged = new SharedDataMessage(); merged.setExpireAt(message.getExpireAt()); merged.setKey(message.getKey()); merged.setNodeId(message.getNodeId()); @@ -144,12 +144,12 @@ public class GossipCore implements GossipCoreConstants { } } - public void addPerNodeData(GossipDataMessage message){ - ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>(); + public void addPerNodeData(PerNodeDataMessage message){ + ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); if (nodeMap != null){ - GossipDataMessage current = nodeMap.get(message.getKey()); + PerNodeDataMessage current = nodeMap.get(message.getKey()); if (current == null){ nodeMap.putIfAbsent(message.getKey(), message); } else { @@ -160,11 +160,11 @@ public class GossipCore implements GossipCoreConstants { } } - public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){ + public ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> getPerNodeData(){ return perNodeData; } - public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() { + public ConcurrentHashMap<String, SharedDataMessage> getSharedData() { return sharedData; } @@ -314,12 +314,12 @@ public class GossipCore implements GossipCoreConstants { * @param remoteList * */ - public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - List<GossipMember> remoteList) { + public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, + List<Member> remoteList) { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } - for (LocalGossipMember i : gossipManager.getDeadMembers()) { + for (LocalMember i : gossipManager.getDeadMembers()) { if (i.getId().equals(senderMember.getId())) { LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); i.recordHeartbeat(senderMember.getHeartbeat()); @@ -327,11 +327,11 @@ public class GossipCore implements GossipCoreConstants { //TODO consider forcing an UP here } } - for (GossipMember remoteMember : remoteList) { + for (Member remoteMember : remoteList) { if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { continue; } - LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), + LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(), remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), @@ -342,7 +342,7 @@ public class GossipCore implements GossipCoreConstants { aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); if (result != null){ - for (Entry<LocalGossipMember, GossipState> localMember : gossipManager.getMembers().entrySet()){ + for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){ if (localMember.getKey().getId().equals(remoteMember.getId())){ localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); @@ -356,8 +356,8 @@ public class GossipCore implements GossipCoreConstants { } } - private void debugState(RemoteGossipMember senderMember, - List<GossipMember> remoteList){ + private void debugState(RemoteMember senderMember, + List<Member> remoteList){ LOGGER.warn( "-----------------------\n" + "Me " + gossipManager.getMyself() + "\n" + @@ -369,13 +369,13 @@ public class GossipCore implements GossipCoreConstants { } @SuppressWarnings("rawtypes") - public Crdt merge(SharedGossipDataMessage message) { + public Crdt merge(SharedDataMessage message) { for (;;){ - SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); if (previous == null){ return (Crdt) message.getPayload(); } - SharedGossipDataMessage copy = new SharedGossipDataMessage(); + SharedDataMessage copy = new SharedDataMessage(); copy.setExpireAt(message.getExpireAt()); copy.setKey(message.getKey()); copy.setNodeId(message.getNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 4b28f2f..ba8517b 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -19,16 +19,16 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipMember; +import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.handlers.MessageInvoker; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.model.ShutdownMessage; import org.apache.log4j.Logger; @@ -44,13 +44,12 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; - public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members; - private final LocalGossipMember me; + private final ConcurrentSkipListMap<LocalMember, GossipState> members; + private final LocalMember me; private final GossipSettings settings; private final AtomicBoolean gossipServiceRunning; private final GossipListener listener; @@ -70,19 +69,19 @@ public abstract class GossipManager { public GossipManager(String cluster, URI uri, String id, Map<String, String> properties, GossipSettings settings, - List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, + List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) { this.settings = settings; this.messageInvoker = messageInvoker; clock = new SystemClock(); - me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties, + me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); gossipCore = new GossipCore(this, registry); dataReaper = new DataReaper(gossipCore, clock); members = new ConcurrentSkipListMap<>(); - for (GossipMember startupMember : gossipMembers) { + for (Member startupMember : gossipMembers) { if (!startupMember.equals(me)) { - LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), + LocalMember member = new LocalMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); @@ -106,7 +105,7 @@ public abstract class GossipManager { return messageInvoker; } - public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() { + public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() { return members; } @@ -117,7 +116,7 @@ public abstract class GossipManager { /** * @return a read only list of members found in the DOWN state. */ - public List<LocalGossipMember> getDeadMembers() { + public List<LocalMember> getDeadMembers() { return Collections.unmodifiableList( members.entrySet() .stream() @@ -129,7 +128,7 @@ public abstract class GossipManager { * * @return a read only list of members found in the UP state */ - public List<LocalGossipMember> getLiveMembers() { + public List<LocalMember> getLiveMembers() { return Collections.unmodifiableList( members.entrySet() .stream() @@ -137,7 +136,7 @@ public abstract class GossipManager { .map(Entry::getKey).collect(Collectors.toList())); } - public LocalGossipMember getMyself() { + public LocalMember getMyself() { return me; } @@ -164,7 +163,7 @@ public abstract class GossipManager { scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); scheduledServiced.scheduleAtFixedRate(() -> { try { - for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) { + for (Entry<LocalMember, GossipState> entry : members.entrySet()) { boolean userDown = processOptomisticShutdown(entry); if (userDown) continue; @@ -205,8 +204,8 @@ public abstract class GossipManager { * @param l member to consider * @return true if node forced down */ - public boolean processOptomisticShutdown(Entry<LocalGossipMember, GossipState> l){ - GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); + public boolean processOptomisticShutdown(Entry<LocalMember, GossipState> l){ + PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); if (m == null){ return false; } @@ -224,8 +223,8 @@ public abstract class GossipManager { } private void readSavedRingState() { - for (LocalGossipMember l : ringState.readFromDisk()){ - LocalGossipMember member = new LocalGossipMember(l.getClusterName(), + for (LocalMember l : ringState.readFromDisk()){ + LocalMember member = new LocalMember(l.getClusterName(), l.getUri(), l.getId(), clock.nanoTime(), l.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); @@ -234,12 +233,12 @@ public abstract class GossipManager { } private void readSavedDataState() { - for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){ - for (Entry<String, GossipDataMessage> j : l.getValue().entrySet()){ + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){ + for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){ gossipCore.addPerNodeData(j.getValue()); } } - for (Entry<String, SharedGossipDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){ + for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){ gossipCore.addSharedData(l.getValue()); } } @@ -276,7 +275,7 @@ public abstract class GossipManager { scheduledServiced.shutdownNow(); } - public void gossipPerNodeData(GossipDataMessage message){ + public void gossipPerNodeData(PerNodeDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); Objects.nonNull(message.getPayload()); @@ -284,7 +283,7 @@ public abstract class GossipManager { gossipCore.addPerNodeData(message); } - public void gossipSharedData(SharedGossipDataMessage message){ + public void gossipSharedData(SharedDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); Objects.nonNull(message.getPayload()); @@ -295,7 +294,7 @@ public abstract class GossipManager { @SuppressWarnings("rawtypes") public Crdt findCrdt(String key){ - SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + SharedDataMessage l = gossipCore.getSharedData().get(key); if (l == null){ return null; } @@ -307,7 +306,7 @@ public abstract class GossipManager { } @SuppressWarnings("rawtypes") - public Crdt merge(SharedGossipDataMessage message){ + public Crdt merge(SharedDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); Objects.nonNull(message.getPayload()); @@ -318,12 +317,12 @@ public abstract class GossipManager { return gossipCore.merge(message); } - public GossipDataMessage findPerNodeGossipData(String nodeId, String key){ - ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId); + public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){ + ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ return null; } else { - GossipDataMessage l = j.get(key); + PerNodeDataMessage l = j.get(key); if (l == null){ return null; } @@ -334,8 +333,8 @@ public abstract class GossipManager { } } - public SharedGossipDataMessage findSharedGossipData(String key){ - SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + public SharedDataMessage findSharedGossipData(String key){ + SharedDataMessage l = gossipCore.getSharedData().get(key); if (l == null){ return null; } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java new file mode 100644 index 0000000..b87045b --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.Member; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.StartupSettings; +import org.apache.gossip.crdt.CrdtModule; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.manager.handlers.DefaultMessageInvoker; +import org.apache.gossip.manager.handlers.MessageInvoker; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GossipManagerBuilder { + + public static ManagerBuilder newBuilder() { + return new ManagerBuilder(); + } + + public static final class ManagerBuilder { + private String cluster; + private URI uri; + private String id; + private GossipSettings settings; + private List<Member> gossipMembers; + private GossipListener listener; + private MetricRegistry registry; + private Map<String,String> properties; + private ObjectMapper objectMapper; + private MessageInvoker messageInvoker; + + private ManagerBuilder() {} + + private void checkArgument(boolean check, String msg) { + if (!check) { + throw new IllegalArgumentException(msg); + } + } + + public ManagerBuilder cluster(String cluster) { + this.cluster = cluster; + return this; + } + + public ManagerBuilder properties(Map<String,String> properties) { + this.properties = properties; + return this; + } + + public ManagerBuilder id(String id) { + this.id = id; + return this; + } + + public ManagerBuilder gossipSettings(GossipSettings settings) { + this.settings = settings; + return this; + } + + public ManagerBuilder startupSettings(StartupSettings startupSettings) { + this.cluster = startupSettings.getCluster(); + this.id = startupSettings.getId(); + this.settings = startupSettings.getGossipSettings(); + this.gossipMembers = startupSettings.getGossipMembers(); + this.uri = startupSettings.getUri(); + return this; + } + + public ManagerBuilder gossipMembers(List<Member> members) { + this.gossipMembers = members; + return this; + } + + public ManagerBuilder listener(GossipListener listener) { + this.listener = listener; + return this; + } + + public ManagerBuilder registry(MetricRegistry registry) { + this.registry = registry; + return this; + } + + public ManagerBuilder uri(URI uri){ + this.uri = uri; + return this; + } + + public ManagerBuilder mapper(ObjectMapper objectMapper){ + this.objectMapper = objectMapper; + return this; + } + + public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { + this.messageInvoker = messageInvoker; + return this; + } + + public GossipManager build() { + checkArgument(id != null, "You must specify an id"); + checkArgument(cluster != null, "You must specify a cluster name"); + checkArgument(settings != null, "You must specify gossip settings"); + checkArgument(uri != null, "You must specify a uri"); + if (registry == null){ + registry = new MetricRegistry(); + } + if (properties == null){ + properties = new HashMap<String,String>(); + } + if (listener == null){ + listener((a,b) -> {}); + } + if (gossipMembers == null) { + gossipMembers = new ArrayList<>(); + } + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + objectMapper.enableDefaultTyping(); + objectMapper.registerModule(new CrdtModule()); + objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); + } + if (messageInvoker == null) { + messageInvoker = new DefaultMessageInvoker(); + } + return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/RingStatePersister.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java index 6f724e0..7e42562 100644 --- a/src/main/java/org/apache/gossip/manager/RingStatePersister.java +++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -26,7 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.log4j.Logger; public class RingStatePersister implements Runnable { @@ -52,7 +52,7 @@ public class RingStatePersister implements Runnable { if (!parent.getSettings().isPersistRingState()){ return; } - NavigableSet<LocalGossipMember> i = parent.getMembers().keySet(); + NavigableSet<LocalMember> i = parent.getMembers().keySet(); try (FileOutputStream fos = new FileOutputStream(computeTarget())){ parent.getObjectMapper().writeValue(fos, i); } catch (IOException e) { @@ -61,7 +61,7 @@ public class RingStatePersister implements Runnable { } @SuppressWarnings("unchecked") - List<LocalGossipMember> readFromDisk(){ + List<LocalMember> readFromDisk(){ if (!parent.getSettings().isPersistRingState()){ return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java index 839d796..e47fe2a 100644 --- a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java +++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java @@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import com.codahale.metrics.MetricRegistry; @@ -88,12 +88,12 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { } protected void sendToALiveMember(){ - LocalGossipMember member = selectPartner(gossipManager.getLiveMembers()); + LocalMember member = selectPartner(gossipManager.getLiveMembers()); sendMembershipList(gossipManager.getMyself(), member); } protected void sendToDeadMember(){ - LocalGossipMember member = selectPartner(gossipManager.getDeadMembers()); + LocalMember member = selectPartner(gossipManager.getDeadMembers()); sendMembershipList(gossipManager.getMyself(), member); } @@ -101,7 +101,7 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { * sends an optimistic shutdown message to several clusters nodes */ protected void sendShutdownMessage(){ - List<LocalGossipMember> l = gossipManager.getLiveMembers(); + List<LocalMember> l = gossipManager.getLiveMembers(); int sendTo = l.size() < 3 ? 1 : l.size() / 2; for (int i = 0; i < sendTo; i++) { threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/UserDataPersister.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java index 4a8a415..3b9eafa 100644 --- a/src/main/java/org/apache/gossip/manager/UserDataPersister.java +++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java @@ -23,8 +23,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.apache.log4j.Logger; public class UserDataPersister implements Runnable { @@ -49,16 +49,16 @@ public class UserDataPersister implements Runnable { } @SuppressWarnings("unchecked") - ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> readPerNodeFromDisk(){ + ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){ if (!parent.getSettings().isPersistDataState()){ - return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>(); + return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>(); } try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } - return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>(); + return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>(); } void writePerNodeToDisk(){ @@ -84,16 +84,16 @@ public class UserDataPersister implements Runnable { } @SuppressWarnings("unchecked") - ConcurrentHashMap<String, SharedGossipDataMessage> readSharedDataFromDisk(){ + ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){ if (!parent.getSettings().isPersistRingState()){ - return new ConcurrentHashMap<String, SharedGossipDataMessage>(); + return new ConcurrentHashMap<String, SharedDataMessage>(); } try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } - return new ConcurrentHashMap<String, SharedGossipDataMessage>(); + return new ConcurrentHashMap<String, SharedDataMessage>(); } /** http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/51998219/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java index 54aa40c..f5e568e 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java +++ b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java @@ -17,8 +17,8 @@ */ package org.apache.gossip.manager.handlers; -import org.apache.gossip.GossipMember; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.Member; +import org.apache.gossip.RemoteMember; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; @@ -34,8 +34,8 @@ import java.util.List; public class ActiveGossipMessageHandler implements MessageHandler { @Override public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - List<GossipMember> remoteGossipMembers = new ArrayList<>(); - RemoteGossipMember senderMember = null; + List<Member> remoteGossipMembers = new ArrayList<>(); + RemoteMember senderMember = null; UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { URI u; @@ -45,7 +45,7 @@ public class ActiveGossipMessageHandler implements MessageHandler { GossipCore.LOGGER.debug("Gossip message with faulty URI", e); continue; } - RemoteGossipMember member = new RemoteGossipMember( + RemoteMember member = new RemoteMember( activeGossipMessage.getMembers().get(i).getCluster(), u, activeGossipMessage.getMembers().get(i).getId(),
