Repository: incubator-gossip Updated Branches: refs/heads/master 9a1af76df -> 375cee2a3
GOSSIP-15 avoid busy loop (ChiaHung Lin via egc) Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/375cee2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/375cee2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/375cee2a Branch: refs/heads/master Commit: 375cee2a3b899931e3aa5372da1199f49484b44c Parents: 9a1af76 Author: Edward Capriolo <edlinuxg...@gmail.com> Authored: Fri Sep 23 22:13:40 2016 -0400 Committer: Edward Capriolo <edlinuxg...@gmail.com> Committed: Fri Sep 23 22:13:40 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipService.java | 4 +- .../gossip/manager/ActiveGossipThread.java | 124 ++++++++++++++----- .../apache/gossip/manager/GossipManager.java | 17 +-- .../random/RandomActiveGossipThread.java | 120 ------------------ 4 files changed, 99 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/GossipService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index ce15992..68a4ca2 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -70,8 +70,8 @@ public class GossipService { } public void start() { - LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri()); - gossipManager.start(); + LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri()); + gossipManager.init(); } public void shutdown() { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 181d9ae..b57c25a 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -17,59 +17,103 @@ */ package org.apache.gossip.manager; +import java.io.IOException; +import java.net.DatagramSocket; import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.Response; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; /** * [The active thread: periodically send gossip request.] The class handles gossiping the membership * list. This information is important to maintaining a common state among all the nodes, and is * important for detecting failures. */ -abstract public class ActiveGossipThread implements Runnable { +public class ActiveGossipThread { + public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class); + + private ScheduledExecutorService scheduledExecutorService ; + private ObjectMapper MAPPER = new ObjectMapper(); + private final Random random; protected final GossipManager gossipManager; + private final GossipCore gossipCore; - private final AtomicBoolean keepRunning; - - public ActiveGossipThread(GossipManager gossipManager) { + public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; - this.keepRunning = new AtomicBoolean(true); + this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); + this.gossipCore = gossipCore; + this.random = new Random(); } - @Override - public void run() { - while (keepRunning.get()) { - try { - TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); - - // contact a live member. - sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); - - // contact a dead member. - sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); - - } catch (InterruptedException e) { - GossipService.LOGGER.error(e); - keepRunning.set(false); - } - } - shutdown(); + public void init() { + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } - + public void shutdown() { - keepRunning.set(false); + this.scheduledExecutorService.shutdown(); + try { + this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Did not complete shutdown", e); + } } /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - abstract protected void sendMembershipList(LocalGossipMember me, - List<LocalGossipMember> memberList); - + protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { + + me.setHeartbeat(System.currentTimeMillis()); + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } else { + GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); + } + + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : memberList) { + message.getMembers().add(convert(other)); + } + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.warn("Message "+ message + " generated response "+ r); + } + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); + } + } /** * Abstract method which should be implemented by a subclass. This method should return a member * of the list to gossip with. @@ -78,5 +122,23 @@ abstract public class ActiveGossipThread implements Runnable { * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList); + protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + LOGGER.debug("I am alone in this world."); + } + return member; + } + + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setUri(member.getUri().toASCIIString()); + gm.setId(member.getId()); + return gm; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 79be431..0b2cfd2 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -40,9 +40,8 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; -import org.apache.gossip.manager.random.RandomActiveGossipThread; -public abstract class GossipManager extends Thread implements NotificationListener { +public abstract class GossipManager implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); @@ -179,7 +178,7 @@ public abstract class GossipManager extends Thread implements NotificationListen * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. */ - public void run() { + public void init() { for (LocalGossipMember member : members.keySet()) { if (member != me) { member.startTimeoutTimer(); @@ -187,17 +186,9 @@ public abstract class GossipManager extends Thread implements NotificationListen } passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore); - gossipThreadExecutor.execute(activeGossipThread); + activeGossipThread = new ActiveGossipThread(this, this.gossipCore); + activeGossipThread.init(); GossipService.LOGGER.debug("The GossipService is started."); - while (gossipServiceRunning.get()) { - try { - // TODO - TimeUnit.MILLISECONDS.sleep(1); - } catch (InterruptedException e) { - GossipService.LOGGER.warn("The GossipClient was interrupted."); - } - } } /** http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java deleted file mode 100644 index 03d550c..0000000 --- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.random; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Random; -import java.util.UUID; - -import org.apache.gossip.GossipService; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.manager.ActiveGossipThread; -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.GossipMember; -import org.apache.gossip.model.Response; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; - -public class RandomActiveGossipThread extends ActiveGossipThread { - - public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class); - protected ObjectMapper MAPPER = new ObjectMapper(); - - /** The Random used for choosing a member to gossip with. */ - private final Random random; - private final GossipCore gossipCore; - - public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { - super(gossipManager); - random = new Random(); - this.gossipCore = gossipCore; - } - - /** - * [The selectToSend() function.] Find a random peer from the local membership list. In the case - * where this client is the only member in the list, this method will return null. - * - * @return Member random member if list is greater than 1, null otherwise - */ - protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - GossipService.LOGGER.debug("I am alone in this world."); - - } - return member; - } - - protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { - - me.setHeartbeat(System.currentTimeMillis()); - LocalGossipMember member = selectPartner(memberList); - if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); - return; - } else { - GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); - } - - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); - UdpActiveGossipMessage message = new UdpActiveGossipMessage(); - message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); - message.setUuid(UUID.randomUUID().toString()); - message.getMembers().add(convert(me)); - for (LocalGossipMember other : memberList) { - message.getMembers().add(convert(other)); - } - byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - Response r = gossipCore.send(message, member.getUri()); - if (r instanceof ActiveGossipOk){ - //maybe count metrics here - } else { - LOGGER.warn("Message "+ message + " generated response "+ r); - } - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } catch (IOException e1) { - GossipService.LOGGER.warn(e1); - } - } - - private GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); - gm.setCluster(member.getClusterName()); - gm.setHeartbeat(member.getHeartbeat()); - gm.setUri(member.getUri().toASCIIString()); - gm.setId(member.getId()); - return gm; - } - -}