Repository: incubator-gossip Updated Branches: refs/heads/master 4d2ea58ba -> 3ca8e0f9c
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java new file mode 100644 index 0000000..99b5807 --- /dev/null +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +/** + * In this object the settings used by the GossipService are held. + * + * @author harmenw + */ +public class GossipSettings { + + /** Time between gossip'ing in ms. Default is 1 second. */ + private int gossipInterval = 1000; + + /** Time between cleanups in ms. Default is 10 seconds. */ + private int cleanupInterval = 10000; + + /** + * Construct GossipSettings with default settings. + */ + public GossipSettings() { + } + + /** + * Construct GossipSettings with given settings. + * + * @param gossipInterval + * The gossip interval in ms. + * @param cleanupInterval + * The cleanup interval in ms. + */ + public GossipSettings(int gossipInterval, int cleanupInterval) { + this.gossipInterval = gossipInterval; + this.cleanupInterval = cleanupInterval; + } + + /** + * Set the gossip interval. This is the time between a gossip message is send. + * + * @param gossipInterval + * The gossip interval in ms. + */ + public void setGossipTimeout(int gossipInterval) { + this.gossipInterval = gossipInterval; + } + + /** + * Set the cleanup interval. This is the time between the last heartbeat received from a member + * and when it will be marked as dead. + * + * @param cleanupInterval + * The cleanup interval in ms. + */ + public void setCleanupInterval(int cleanupInterval) { + this.cleanupInterval = cleanupInterval; + } + + /** + * Get the gossip interval. + * + * @return The gossip interval in ms. + */ + public int getGossipInterval() { + return gossipInterval; + } + + /** + * Get the clean interval. + * + * @return The cleanup interval. + */ + public int getCleanupInterval() { + return cleanupInterval; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/GossipTimeoutTimer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java new file mode 100644 index 0000000..2fa09c0 --- /dev/null +++ b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.util.Date; + +import javax.management.NotificationListener; +import javax.management.timer.Timer; + +/** + * This object represents a timer for a gossip member. When the timer has elapsed without being + * reset in the meantime, it will inform the GossipService about this who in turn will put the + * gossip member on the dead list, because it is apparantly not alive anymore. + * + * @author joshclemm, harmenw + */ +public class GossipTimeoutTimer extends Timer { + + private final long sleepTime; + + private final LocalGossipMember source; + + /** + * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime. + * + * @param millisecondsSleepTime + * The time for this timer to wait before an event. + * @param notificationListener + * @param member + */ + public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, + LocalGossipMember member) { + super(); + sleepTime = millisecondsSleepTime; + source = member; + addNotificationListener(notificationListener, null, null); + } + + /** + * @see javax.management.timer.Timer#start() + */ + public void start() { + this.reset(); + super.start(); + } + + /** + * Resets timer to start counting down from original time. + */ + public void reset() { + removeAllNotifications(); + setWakeupTime(sleepTime); + } + + /** + * Adds a new wake-up time for this timer. + * + * @param milliseconds + */ + private void setWakeupTime(long milliseconds) { + addNotification("type", "message", source, new Date(System.currentTimeMillis() + milliseconds)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/LocalGossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java new file mode 100644 index 0000000..55ce257 --- /dev/null +++ b/src/main/java/org/apache/gossip/LocalGossipMember.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import javax.management.NotificationListener; + +/** + * This object represent a gossip member with the properties known locally. These objects are stored + * in the local list of gossip member.s + * + * @author harmenw + */ +public class LocalGossipMember extends GossipMember { + /** The timeout timer for this gossip member. */ + private final transient GossipTimeoutTimer timeoutTimer; + + /** + * Constructor. + * + * @param hostname + * The hostname or IP address. + * @param port + * The port number. + * @param id + * @param heartbeat + * The current heartbeat. + * @param notificationListener + * @param cleanupTimeout + * The cleanup timeout for this gossip member. + */ + public LocalGossipMember(String clusterName, String hostname, int port, String id, + long heartbeat, NotificationListener notificationListener, int cleanupTimeout) { + super(clusterName, hostname, port, id, heartbeat); + + timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); + } + + /** + * Start the timeout timer. + */ + public void startTimeoutTimer() { + timeoutTimer.start(); + } + + /** + * Reset the timeout timer. + */ + public void resetTimeoutTimer() { + timeoutTimer.reset(); + } + + public void disableTimer() { + timeoutTimer.removeAllNotifications(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/RemoteGossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java new file mode 100644 index 0000000..899da93 --- /dev/null +++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +/** + * The object represents a gossip member with the properties as received from a remote gossip + * member. + * + * @author harmenw + */ +public class RemoteGossipMember extends GossipMember { + + /** + * Constructor. + * + * @param hostname + * The hostname or IP address. + * @param port + * The port number. + * @param heartbeat + * The current heartbeat. + */ + public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) { + super(clusterName, hostname, port, id, heartbeat); + } + + /** + * Construct a RemoteGossipMember with a heartbeat of 0. + * + * @param hostname + * The hostname or IP address. + * @param port + * The port number. + */ + public RemoteGossipMember(String clusterName, String hostname, int port, String id) { + super(clusterName, hostname, port, id, System.currentTimeMillis()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/StartupSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java new file mode 100644 index 0000000..176a79b --- /dev/null +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +/** + * This object represents the settings used when starting the gossip service. + * + * @author harmenw + */ +public class StartupSettings { + private static final Logger log = Logger.getLogger(StartupSettings.class); + + /** The id to use fo the service */ + private String id; + + /** The port to start the gossip service on. */ + private int port; + + private String cluster; + + /** The gossip settings used at startup. */ + private final GossipSettings gossipSettings; + + /** The list with gossip members to start with. */ + private final List<GossipMember> gossipMembers; + + /** + * Constructor. + * + * @param id + * The id to be used for this service + * @param port + * The port to start the service on. + * @param logLevel + * unused + */ + public StartupSettings(String id, int port, int logLevel, String cluster) { + this(id, port, new GossipSettings(), cluster); + } + + /** + * Constructor. + * + * @param id + * The id to be used for this service + * @param port + * The port to start the service on. + */ + public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) { + this.id = id; + this.port = port; + this.gossipSettings = gossipSettings; + this.setCluster(cluster); + gossipMembers = new ArrayList<>(); + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getCluster() { + return cluster; + } + + /** + * Set the id to be used for this service. + * + * @param id + * The id for this service. + */ + public void setId(String id) { + this.id = id; + } + + /** + * Get the id for this service. + * + * @return the service's id. + */ + public String getId() { + return id; + } + + /** + * Set the port of the gossip service. + * + * @param port + * The port for the gossip service. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Get the port for the gossip service. + * + * @return The port of the gossip service. + */ + public int getPort() { + return port; + } + + /** + * Get the GossipSettings. + * + * @return The GossipSettings object. + */ + public GossipSettings getGossipSettings() { + return gossipSettings; + } + + /** + * Add a gossip member to the list of members to start with. + * + * @param member + * The member to add. + */ + public void addGossipMember(GossipMember member) { + gossipMembers.add(member); + } + + /** + * Get the list with gossip members. + * + * @return The gossip members. + */ + public List<GossipMember> getGossipMembers() { + return gossipMembers; + } + + /** + * Parse the settings for the gossip service from a JSON file. + * + * @param jsonFile + * The file object which refers to the JSON config file. + * @return The StartupSettings object with the settings from the config file. + * @throws JSONException + * Thrown when the file is not well-formed JSON. + * @throws FileNotFoundException + * Thrown when the file cannot be found. + * @throws IOException + * Thrown when reading the file gives problems. + */ + public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, + FileNotFoundException, IOException { + // Read the file to a String. + StringBuffer buffer = new StringBuffer(); + try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){ + String line; + while ((line = br.readLine()) != null) { + buffer.append(line.trim()); + } + } + + JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0); + int port = jsonObject.getInt("port"); + String id = jsonObject.getString("id"); + int gossipInterval = jsonObject.getInt("gossip_interval"); + int cleanupInterval = jsonObject.getInt("cleanup_interval"); + String cluster = jsonObject.getString("cluster"); + if (cluster == null){ + throw new IllegalArgumentException("cluster was null. It is required"); + } + StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval, + cleanupInterval), cluster); + + // Now iterate over the members from the config file and add them to the settings. + String configMembersDetails = "Config-members ["; + JSONArray membersJSON = jsonObject.getJSONArray("members"); + for (int i = 0; i < membersJSON.length(); i++) { + JSONObject memberJSON = membersJSON.getJSONObject(i); + RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), + memberJSON.getString("host"), memberJSON.getInt("port"), ""); + settings.addGossipMember(member); + configMembersDetails += member.getAddress(); + if (i < (membersJSON.length() - 1)) + configMembersDetails += ", "; + } + log.info(configMembersDetails + "]"); + + // Return the created settings object. + return settings; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/event/GossipListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java new file mode 100644 index 0000000..2e882f6 --- /dev/null +++ b/src/main/java/org/apache/gossip/event/GossipListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event; + +import org.apache.gossip.GossipMember; + +public interface GossipListener { + void gossipEvent(GossipMember member, GossipState state); +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/event/GossipState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/event/GossipState.java b/src/main/java/org/apache/gossip/event/GossipState.java new file mode 100644 index 0000000..3b76c9e --- /dev/null +++ b/src/main/java/org/apache/gossip/event/GossipState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event; + +public enum GossipState { + UP("up"), DOWN("down"); + @SuppressWarnings("unused") + private final String state; + + private GossipState(String state) { + this.state = state; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/examples/GossipExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java new file mode 100644 index 0000000..e953c77 --- /dev/null +++ b/src/main/java/org/apache/gossip/examples/GossipExample.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.examples; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; + +/** + * This class is an example of how one could use the gossip service. Here we start multiple gossip + * clients on this host as specified in the config file. + * + * @author harmenw + */ +public class GossipExample extends Thread { + /** The number of clients to start. */ + private static final int NUMBER_OF_CLIENTS = 4; + + /** + * @param args + */ + public static void main(String[] args) { + new GossipExample(); + } + + /** + * Constructor. This will start the this thread. + */ + public GossipExample() { + start(); + } + + /** + * @see java.lang.Thread#run() + */ + public void run() { + try { + GossipSettings settings = new GossipSettings(); + + List<GossipService> clients = new ArrayList<>(); + + // Get my ip address. + String myIpAddress = InetAddress.getLocalHost().getHostAddress(); + + String cluster = "My Gossip Cluster"; + + // Create the gossip members and put them in a list and give them a port number starting with + // 2000. + List<GossipMember> startupMembers = new ArrayList<>(); + for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { + startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, "")); + } + + // Lets start the gossip clients. + // Start the clients, waiting cleaning-interval + 1 second between them which will show the + // dead list handling. + for (GossipMember member : startupMembers) { + GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "", + startupMembers, settings, null); + clients.add(gossipService); + gossipService.start(); + sleep(settings.getCleanupInterval() + 1000); + } + + // After starting all gossip clients, first wait 10 seconds and then shut them down. + sleep(10000); + System.err.println("Going to shutdown all services..."); + // Since they all run in the same virtual machine and share the same executor, if one is + // shutdown they will all stop. + clients.get(0).shutdown(); + + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java new file mode 100644 index 0000000..b966fcb --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.gossip.GossipService; +import org.apache.gossip.LocalGossipMember; + +/** + * [The active thread: periodically send gossip request.] The class handles gossiping the membership + * list. This information is important to maintaining a common state among all the nodes, and is + * important for detecting failures. + */ +abstract public class ActiveGossipThread implements Runnable { + + protected final GossipManager gossipManager; + + private final AtomicBoolean keepRunning; + + public ActiveGossipThread(GossipManager gossipManager) { + this.gossipManager = gossipManager; + this.keepRunning = new AtomicBoolean(true); + } + + @Override + public void run() { + while (keepRunning.get()) { + try { + TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); + sendMembershipList(gossipManager.getMyself(), gossipManager.getMemberList()); + } catch (InterruptedException e) { + GossipService.LOGGER.error(e); + keepRunning.set(false); + } + } + shutdown(); + } + + public void shutdown() { + keepRunning.set(false); + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + abstract protected void sendMembershipList(LocalGossipMember me, + List<LocalGossipMember> memberList); + + /** + * Abstract method which should be implemented by a subclass. This method should return a member + * of the list to gossip with. + * + * @param memberList + * The list of members which are stored in the local list of members. + * @return The chosen LocalGossipMember to gossip with. + */ + abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList); +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java new file mode 100644 index 0000000..80cadf7 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.management.Notification; +import javax.management.NotificationListener; + +import org.apache.log4j.Logger; + +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; + +public abstract class GossipManager extends Thread implements NotificationListener { + + public static final Logger LOGGER = Logger.getLogger(GossipManager.class); + + public static final int MAX_PACKET_SIZE = 102400; + + private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members; + + private final LocalGossipMember me; + + private final GossipSettings settings; + + private final AtomicBoolean gossipServiceRunning; + + private final Class<? extends PassiveGossipThread> passiveGossipThreadClass; + + private final Class<? extends ActiveGossipThread> activeGossipThreadClass; + + private final GossipListener listener; + + private ActiveGossipThread activeGossipThread; + + private PassiveGossipThread passiveGossipThread; + + private ExecutorService gossipThreadExecutor; + + public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, + Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster, + String address, int port, String id, GossipSettings settings, + List<GossipMember> gossipMembers, GossipListener listener) { + this.passiveGossipThreadClass = passiveGossipThreadClass; + this.activeGossipThreadClass = activeGossipThreadClass; + this.settings = settings; + me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, + settings.getCleanupInterval()); + members = new ConcurrentSkipListMap<>(); + for (GossipMember startupMember : gossipMembers) { + if (!startupMember.equals(me)) { + LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), + startupMember.getHost(), startupMember.getPort(), startupMember.getId(), + System.currentTimeMillis(), this, settings.getCleanupInterval()); + members.put(member, GossipState.UP); + GossipService.LOGGER.debug(member); + } + } + gossipThreadExecutor = Executors.newCachedThreadPool(); + gossipServiceRunning = new AtomicBoolean(true); + this.listener = listener; + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + public void run() { + GossipService.LOGGER.debug("Service has been shutdown..."); + } + })); + } + + /** + * All timers associated with a member will trigger this method when it goes off. The timer will + * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time. + */ + @Override + public void handleNotification(Notification notification, Object handback) { + LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); + GossipService.LOGGER.debug("Dead member detected: " + deadMember); + members.put(deadMember, GossipState.DOWN); + if (listener != null) { + listener.gossipEvent(deadMember, GossipState.DOWN); + } + } + + public void revivieMember(LocalGossipMember m) { + for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) { + if (it.getKey().getId().equals(m.getId())) { + it.getKey().disableTimer(); + } + } + members.remove(m); + members.put(m, GossipState.UP); + if (listener != null) { + listener.gossipEvent(m, GossipState.UP); + } + } + + public void createOrRevivieMember(LocalGossipMember m) { + members.put(m, GossipState.UP); + if (listener != null) { + listener.gossipEvent(m, GossipState.UP); + } + } + + public GossipSettings getSettings() { + return settings; + } + + /** + * + * @return a read only list of members found in the UP state + */ + public List<LocalGossipMember> getMemberList() { + List<LocalGossipMember> up = new ArrayList<>(); + for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) { + if (GossipState.UP.equals(entry.getValue())) { + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); + } + + public LocalGossipMember getMyself() { + return me; + } + + public List<LocalGossipMember> getDeadList() { + List<LocalGossipMember> up = new ArrayList<>(); + for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) { + if (GossipState.DOWN.equals(entry.getValue())) { + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); + } + + /** + * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip + * thread and start the receiver thread. + */ + public void run() { + for (LocalGossipMember member : members.keySet()) { + if (member != me) { + member.startTimeoutTimer(); + } + } + try { + passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this); + gossipThreadExecutor.execute(passiveGossipThread); + activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this); + gossipThreadExecutor.execute(activeGossipThread); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e1) { + throw new RuntimeException(e1); + } + GossipService.LOGGER.debug("The GossipService is started."); + while (gossipServiceRunning.get()) { + try { + // TODO + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + GossipService.LOGGER.warn("The GossipClient was interrupted."); + } + } + } + + /** + * Shutdown the gossip service. + */ + public void shutdown() { + gossipServiceRunning.set(false); + gossipThreadExecutor.shutdown(); + if (passiveGossipThread != null) { + passiveGossipThread.shutdown(); + } + if (activeGossipThread != null) { + activeGossipThread.shutdown(); + } + try { + boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + if (!result) { + LOGGER.error("executor shutdown timed out"); + } + } catch (InterruptedException e) { + LOGGER.error(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java new file mode 100644 index 0000000..bd7354e --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; +import org.apache.gossip.RemoteGossipMember; + +/** + * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, + * where this client has received an incoming message. For now, this message is always the + * membership list, but if you choose to gossip additional information, you will need some logic to + * determine the incoming message. + */ +abstract public class PassiveGossipThread implements Runnable { + + public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); + + /** The socket used for the passive thread of the gossip service. */ + private final DatagramSocket server; + + private final GossipManager gossipManager; + + private final AtomicBoolean keepRunning; + + private final String cluster; + + private final ObjectMapper MAPPER = new ObjectMapper(); + + public PassiveGossipThread(GossipManager gossipManager) { + this.gossipManager = gossipManager; + try { + SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(), + gossipManager.getMyself().getPort()); + server = new DatagramSocket(socketAddress); + GossipService.LOGGER.debug("Gossip service successfully initialized on port " + + gossipManager.getMyself().getPort()); + GossipService.LOGGER.debug("I am " + gossipManager.getMyself()); + cluster = gossipManager.getMyself().getClusterName(); + if (cluster == null){ + throw new IllegalArgumentException("cluster was null"); + } + } catch (SocketException ex) { + GossipService.LOGGER.warn(ex); + throw new RuntimeException(ex); + } + keepRunning = new AtomicBoolean(true); + } + + @Override + public void run() { + while (keepRunning.get()) { + try { + byte[] buf = new byte[server.getReceiveBufferSize()]; + DatagramPacket p = new DatagramPacket(buf, buf.length); + server.receive(p); + int packet_length = 0; + for (int i = 0; i < 4; i++) { + int shift = (4 - 1 - i) * 8; + packet_length += (buf[i] & 0x000000FF) << shift; + } + if (packet_length <= GossipManager.MAX_PACKET_SIZE) { + byte[] json_bytes = new byte[packet_length]; + for (int i = 0; i < packet_length; i++) { + json_bytes[i] = buf[i + 4]; + } + if (GossipService.LOGGER.isDebugEnabled()){ + String receivedMessage = new String(json_bytes); + GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " + + receivedMessage); + } + try { + List<GossipMember> remoteGossipMembers = new ArrayList<>(); + RemoteGossipMember senderMember = null; + ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes, + ActiveGossipMessage.class); + for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + RemoteGossipMember member = new RemoteGossipMember( + activeGossipMessage.getMembers().get(i).getCluster(), + activeGossipMessage.getMembers().get(i).getHost(), + activeGossipMessage.getMembers().get(i).getPort(), + activeGossipMessage.getMembers().get(i).getId(), + activeGossipMessage.getMembers().get(i).getHeartbeat()); + if (!(member.getClusterName().equals(cluster))){ + GossipService.LOGGER.warn("Note a member of this cluster " + i); + continue; + } + // This is the first member found, so this should be the member who is communicating + // with me. + if (i == 0) { + senderMember = member; + } + remoteGossipMembers.add(member); + } + mergeLists(gossipManager, senderMember, remoteGossipMembers); + } catch (RuntimeException ex) { + GossipService.LOGGER.error("Unable to process message", ex); + } + } else { + GossipService.LOGGER + .error("The received message is not of the expected size, it has been dropped."); + } + + } catch (IOException e) { + GossipService.LOGGER.error(e); + System.out.println(e); + keepRunning.set(false); + } + } + shutdown(); + } + + public void shutdown() { + try { + server.close(); + } catch (RuntimeException ex) { + } + } + + /** + * Abstract method for merging the local and remote list. + * + * @param gossipManager + * The GossipManager for retrieving the local members and dead members list. + * @param senderMember + * The member who is sending this list, this could be used to send a response if the + * remote list contains out-dated information. + * @param remoteList + * The list of members known at the remote side. + */ + abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + List<GossipMember> remoteList); +} + +/* + * random comments // Check whether the package is smaller than the maximal packet length. // A + * package larger than this would not be possible to be send from a GossipService, // since this is + * check before sending the message. // This could normally only occur when the list of members is + * very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore. + * // For this reason we regards the message. + */ http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java new file mode 100644 index 0000000..edf21f3 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.impl; + +import java.util.List; + +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.PassiveGossipThread; + +public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread { + + public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + } + + /** + * Merge remote list (received from peer), and our local member list. Simply, we must update the + * heartbeats that the remote list has with our list. Also, some additional logic is needed to + * make sure we have not timed out a member and then immediately received a list with that member. + * + * @param gossipManager + * @param senderMember + * @param remoteList + */ + protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + List<GossipMember> remoteList) { + + // if the person sending to us is in the dead list consider them up + for (LocalGossipMember i : gossipManager.getDeadList()) { + if (i.getId().equals(senderMember.getId())) { + System.out.println(gossipManager.getMyself() + " caught a live one!"); + LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), + senderMember.getHost(), senderMember.getPort(), senderMember.getId(), + senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.revivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); + } + } + for (GossipMember remoteMember : remoteList) { + if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { + continue; + } + if (gossipManager.getMemberList().contains(remoteMember)) { + LocalGossipMember localMember = gossipManager.getMemberList().get( + gossipManager.getMemberList().indexOf(remoteMember)); + if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { + localMember.setHeartbeat(remoteMember.getHeartbeat()); + localMember.resetTimeoutTimer(); + } + } else if (!gossipManager.getMemberList().contains(remoteMember) + && !gossipManager.getDeadList().contains(remoteMember)) { + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), + remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), + remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.createOrRevivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); + } else { + if (gossipManager.getDeadList().contains(remoteMember)) { + LocalGossipMember localDeadMember = gossipManager.getDeadList().get( + gossipManager.getDeadList().indexOf(remoteMember)); + if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), + remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), + remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.revivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); + GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress() + + " from dead list and added to local member list."); + } else { + GossipService.LOGGER.debug("me " + gossipManager.getMyself()); + GossipService.LOGGER.debug("sender " + senderMember); + GossipService.LOGGER.debug("remote " + remoteList); + GossipService.LOGGER.debug("live " + gossipManager.getMemberList()); + GossipService.LOGGER.debug("dead " + gossipManager.getDeadList()); + } + } else { + GossipService.LOGGER.debug("me " + gossipManager.getMyself()); + GossipService.LOGGER.debug("sender " + senderMember); + GossipService.LOGGER.debug("remote " + remoteList); + GossipService.LOGGER.debug("live " + gossipManager.getMemberList()); + GossipService.LOGGER.debug("dead " + gossipManager.getDeadList()); + // throw new IllegalArgumentException("wtf"); + } + } + } + } + +} + +/** + * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should + * check // that here. // So a member can become from the dead when it is either larger than a + * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of + * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The + * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special + * message for a revived member? // TODO: Or maybe when a member is declared dead for more than // + * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived + * member. // The above is now handle by checking whether the heartbeat differs // + * _settings.getCleanupInterval(), it must be restarted. + */ + +/* + * // The remote member is back from the dead. // Remove it from the dead list. // + * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the + * member list. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java new file mode 100644 index 0000000..16d0d32 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.impl; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.gossip.GossipService; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.manager.ActiveGossipThread; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.GossipMember; +import org.codehaus.jackson.map.ObjectMapper; + +abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { + + protected ObjectMapper om = new ObjectMapper(); + + public SendMembersActiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + } + + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setHost(member.getHost()); + gm.setId(member.getId()); + gm.setPort(member.getPort()); + return gm; + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { + GossipService.LOGGER.debug("Send sendMembershipList() is called."); + me.setHeartbeat(System.currentTimeMillis()); + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + InetAddress dest = InetAddress.getByName(member.getHost()); + ActiveGossipMessage message = new ActiveGossipMessage(); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : memberList) { + message.getMembers().add(convert(other)); + } + byte[] json_bytes = om.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + byte[] buf = createBuffer(packet_length, json_bytes); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort()); + socket.send(datagramPacket); + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); + } + } + + private byte[] createBuffer(int packetLength, byte[] jsonBytes) { + byte[] lengthBytes = new byte[4]; + lengthBytes[0] = (byte) (packetLength >> 24); + lengthBytes[1] = (byte) ((packetLength << 8) >> 24); + lengthBytes[2] = (byte) ((packetLength << 16) >> 24); + lengthBytes[3] = (byte) ((packetLength << 24) >> 24); + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); + byteBuffer.put(lengthBytes); + byteBuffer.put(jsonBytes); + byte[] buf = byteBuffer.array(); + return buf; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java new file mode 100644 index 0000000..23a41f5 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.random; + +import java.util.List; +import java.util.Random; + +import org.apache.gossip.GossipService; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.impl.SendMembersActiveGossipThread; + +public class RandomActiveGossipThread extends SendMembersActiveGossipThread { + + /** The Random used for choosing a member to gossip with. */ + private final Random random; + + public RandomActiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + random = new Random(); + } + + /** + * [The selectToSend() function.] Find a random peer from the local membership list. In the case + * where this client is the only member in the list, this method will return null. + * + * @return Member random member if list is greater than 1, null otherwise + */ + protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + GossipService.LOGGER.debug("I am alone in this world."); + } + return member; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java new file mode 100644 index 0000000..0122610 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.random; + +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; + +import java.util.List; + +public class RandomGossipManager extends GossipManager { + public RandomGossipManager(String cluster, String address, int port, String id, + GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) { + super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, + address, port, id, settings, gossipMembers, listener); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java new file mode 100644 index 0000000..ac940d8 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java @@ -0,0 +1,22 @@ +package org.apache.gossip.model; + +import java.util.ArrayList; +import java.util.List; + +public class ActiveGossipMessage { + + private List<GossipMember> members = new ArrayList<>(); + + public ActiveGossipMessage(){ + + } + + public List<GossipMember> getMembers() { + return members; + } + + public void setMembers(List<GossipMember> members) { + this.members = members; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/model/GossipMember.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java new file mode 100644 index 0000000..8dc6bf7 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/GossipMember.java @@ -0,0 +1,63 @@ +package org.apache.gossip.model; + +public class GossipMember { + + private String cluster; + private String host; + private Integer port; + private String id; + private Long heartbeat; + + public GossipMember(){ + + } + + public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){ + this.cluster=cluster; + this.host= host; + this.port = port; + this.id = id; + + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Long getHeartbeat() { + return heartbeat; + } + + public void setHeartbeat(Long heartbeat) { + this.heartbeat = heartbeat; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java index af30eb7..2d8190b 100644 --- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java @@ -31,12 +31,12 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.junit.Test; -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.RemoteGossipMember; -import com.google.code.gossip.event.GossipListener; -import com.google.code.gossip.event.GossipState; +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; public class ShutdownDeadtimeTest { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/test/java/io/teknek/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java index bf6710e..aa4e404 100644 --- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java +++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java @@ -17,10 +17,10 @@ */ package io.teknek.gossip; -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.StartupSettings; +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.StartupSettings; import org.apache.log4j.Logger; import org.json.JSONException; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index 277d0fe..4e731ae 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -30,12 +30,12 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.junit.Test; -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.RemoteGossipMember; -import com.google.code.gossip.event.GossipListener; -import com.google.code.gossip.event.GossipState; +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; public class TenNodeThreeSeedTest { private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
