GEODE-1874: Changed setNextNeighbor to not create HashMap for every p2p invocation
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/358fca74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/358fca74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/358fca74 Branch: refs/heads/feature/GEODE-1874 Commit: 358fca74568a33c40ffbfd741c85b996e9c69769 Parents: e130e5b Author: Udo Kohlmeyer <[email protected]> Authored: Wed Oct 12 11:54:33 2016 +1100 Committer: Udo Kohlmeyer <[email protected]> Committed: Wed Oct 19 08:41:29 2016 -0700 ---------------------------------------------------------------------- .../membership/gms/fd/GMSHealthMonitor.java | 394 ++++++++++--------- 1 file changed, 210 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/358fca74/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index aafb498..97a413c 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -16,9 +16,7 @@ */ package org.apache.geode.distributed.internal.membership.gms.fd; -import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST; -import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE; -import static org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE; +import static org.apache.geode.internal.DataSerializableFixedID.*; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -29,7 +27,19 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -40,7 +50,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.*; +import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; import org.jgroups.util.UUID; @@ -69,7 +79,7 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel; /** * Failure Detection - * + * <p> * This class make sure that each member is alive and communicating to this member. * To make sure that we create the ring of members based on current view. On this * ring, each member make sure that next-member in ring is communicating with it. @@ -77,17 +87,16 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel; * member has not communicated in last period(member-timeout) then we check whether * this member is still alive or not. Based on that we informed probable coordinators * to remove that member from view. - * + * <p> * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used * to initiate suspect processing for any member. First is checks whether the member is * responding or not. Then it informs probable coordinators to remove that member from * view. - * + * <p> * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see * if that member is alive. Then based on removal flag it initiates the suspect processing * for that member. - * - * */ + */ @SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems" }) public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @@ -99,9 +108,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { volatile private boolean isStopping = false; private final AtomicInteger requestId = new AtomicInteger(); - /** membership logger */ + /** + * membership logger + */ private static final Logger logger = Services.getLogger(); - + /** * The number of recipients of periodic heartbeats. The recipients will * be selected from the members that are likely to be monitoring this member. @@ -115,24 +126,28 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { */ public static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 2); - /** stall time to wait for members leaving concurrently */ + /** + * stall time to wait for members leaving concurrently + */ public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200); private volatile long currentTimeStamp; - - /** this member's ID */ + + /** + * this member's ID + */ private InternalDistributedMember localAddress; /** * Timestamp at which we last had contact from a member */ final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = new ConcurrentHashMap<>(); - + /** * Members currently being suspected and the view they were suspected in */ final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = new ConcurrentHashMap<>(); - + /** * Members undergoing final checks */ @@ -142,7 +157,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * Replies to messages */ final private Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>(); - + /** * Members suspected in a particular view */ @@ -156,29 +171,36 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * to stop check scheduler */ private ScheduledFuture<?> monitorFuture; - - /** test hook */ + + /** + * test hook + */ private volatile boolean playingDead = false; - /** test hook */ + /** + * test hook + */ private volatile boolean beingSick = false; - + // For TCP check private ExecutorService serverSocketExecutor; static final int OK = 0x7B; - static final int ERROR = 0x00; + static final int ERROR = 0x00; private volatile int socketPort; private volatile ServerSocket serverSocket; - - /** Statistics about health monitor */ + + /** + * Statistics about health monitor + */ private DMStats stats; /** * this class is to avoid garbage */ private static class TimeStamp { + private volatile long timeStamp; - + TimeStamp(long timeStamp) { this.timeStamp = timeStamp; } @@ -196,11 +218,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * This class sets start interval timestamp to record the activity of all members. * That is used by {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to * record the activity of member. - * + * * It initiates the suspect processing for next neighbour if it doesn't see any activity from that * member in last interval(member-timeout) */ private class Monitor implements Runnable { + final long memberTimeoutInMillis; public Monitor(long memberTimeout) { @@ -213,25 +236,25 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (GMSHealthMonitor.this.isStopping) { return; } - + InternalDistributedMember neighbour = nextNeighbor; - + long currentTime = System.currentTimeMillis(); //this is the start of interval to record member activity GMSHealthMonitor.this.currentTimeStamp = currentTime; if (neighbour != null) { TimeStamp nextNeighborTS; - synchronized(GMSHealthMonitor.this) { + synchronized (GMSHealthMonitor.this) { nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour); } - + if (nextNeighborTS == null) { TimeStamp customTS = new TimeStamp(currentTime); memberTimeStamps.put(neighbour, customTS); return; } - + long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL; long lastTS = currentTime - nextNeighborTS.getTime(); if (lastTS + interval >= memberTimeoutInMillis) { @@ -249,6 +272,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * notify waiting thread. */ private class Response { + private DistributionMessage responseMsg; public DistributionMessage getResponseMsg() { @@ -276,7 +300,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { OutputStream out = socket.getOutputStream(); @SuppressWarnings("UnusedAssignment") short version = in.readShort(); - int vmViewId = in.readInt(); + int vmViewId = in.readInt(); long uuidLSBs = in.readLong(); long uuidMSBs = in.readLong(); GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived(); @@ -288,9 +312,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { int myVmViewId = gmbr.getVmViewId(); if (playingDead) { logger.debug("HealthMonitor: simulating sick member in health check"); - } else if (uuidLSBs == myUUID.getLeastSignificantBits() - && uuidMSBs == myUUID.getMostSignificantBits() - && vmViewId == myVmViewId) { + } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == myVmViewId) { logger.debug("HealthMonitor: sending OK reply"); out.write(OK); out.flush(); @@ -300,12 +322,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.debug("HealthMonitor: server replied OK."); } else { if (logger.isDebugEnabled()) { - logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}", - Long.toHexString(myUUID.getMostSignificantBits()), - Long.toHexString(myUUID.getLeastSignificantBits()), - Long.toHexString(uuidMSBs), - Long.toHexString(uuidLSBs), - myVmViewId, vmViewId); + logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}", Long.toHexString(myUUID.getMostSignificantBits()), Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs), myVmViewId, vmViewId); } out.write(ERROR); out.flush(); @@ -350,13 +367,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { public void contactedBy(InternalDistributedMember sender) { contactedBy(sender, currentTimeStamp); } - - + + /** * Record member activity at a specified time */ private void contactedBy(InternalDistributedMember sender, long timeStamp) { TimeStamp cTS = new TimeStamp(timeStamp); + //TODO Udo: why putIfAbsent. Surely only put is required cTS = memberTimeStamps.putIfAbsent(sender, cTS); if (cTS != null && cTS.getTime() < timeStamp) { cTS.setTime(timeStamp); @@ -367,7 +385,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { setNextNeighbor(currentView, null); } - + private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember mbr) { final int reqId = requestId.getAndIncrement(); final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId); @@ -390,7 +408,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } catch (CancelException e) { return; } - + if (!pinged) { suspectedMemberInView.put(mbr, currentView); String reason = "Member isn't responding to heartbeat requests"; @@ -442,7 +460,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) { // member is not part of current view. logger.trace("Member {} is not part of current view.", member); - } else if (waitForResponse){ + } else if (waitForResponse) { synchronized (pingResp) { if (pingResp.getResponseMsg() == null) { pingResp.wait(memberTimeout); @@ -470,7 +488,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } catch (InterruptedException e) { logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member); } finally { - if(waitForResponse) { + if (waitForResponse) { requestIdVsResponse.remove(hrm.getRequestId()); } } @@ -480,8 +498,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { /** * During final check, establish TCP connection between current member and suspect member. * And exchange PING/PONG message to see if the suspect member is still alive. - * + * * @param suspectMember member that does not respond to HeartbeatRequestMessage + * * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false. */ boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) { @@ -489,21 +508,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { InternalDistributedSystem internalDistributedSystem = InternalDistributedSystem.getConnectedInstance(); try { logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port); - clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port, - (int)memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false); + clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false); clientSocket.setTcpNoDelay(true); return doTCPCheckMember(suspectMember, clientSocket); - } - catch (IOException e) { + } catch (IOException e) { // this is expected if it is a connection-timeout or other failure // to connect - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { if (!isStopping) { logger.trace("Unexpected exception", e); } - } - finally { + } finally { try { if (clientSocket != null) { clientSocket.setSoLinger(true, 0); // abort the connection @@ -554,10 +569,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { return false; } catch (IOException e) { logger.trace("Unexpected exception", e); - } + } return false; } - + void writeMemberToStream(GMSMember gmbr, DataOutputStream out) throws IOException { out.writeShort(Version.CURRENT_ORDINAL); out.writeInt(gmbr.getVmViewId()); @@ -565,24 +580,24 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { out.writeLong(gmbr.getUuidMSBs()); out.flush(); } - + @Override public void suspect(InternalDistributedMember mbr, String reason) { initiateSuspicion(mbr, reason); // Background suspect-collecting thread is currently disabled - it takes too long -// synchronized (suspectRequests) { -// SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason); -// if (!suspectRequests.contains(sr)) { -// logger.info("Suspecting member {}. Reason= {}.", mbr, reason); -// suspectRequests.add(sr); -// suspectRequests.notify(); -// } -// } + // synchronized (suspectRequests) { + // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason); + // if (!suspectRequests.contains(sr)) { + // logger.info("Suspecting member {}. Reason= {}.", mbr, reason); + // suspectRequests.add(sr); + // suspectRequests.notify(); + // } + // } } @Override public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) { - return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember)mbr, reason); + return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember) mbr, reason); } public void start() { @@ -607,17 +622,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { long delay = memberTimeout / LOGICAL_INTERVAL; monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS); -// suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests, -// new Callback<SuspectRequest>() { -// @Override -// public void process(List<SuspectRequest> requests) { -// GMSHealthMonitor.this.sendSuspectRequest(requests); -// -// } -// }, MEMBER_SUSPECT_COLLECTION_INTERVAL); -// suspectRequestCollectorThread.setDaemon(true); -// suspectRequestCollectorThread.start() - + // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests, + // new Callback<SuspectRequest>() { + // @Override + // public void process(List<SuspectRequest> requests) { + // GMSHealthMonitor.this.sendSuspectRequest(requests); + // + // } + // }, MEMBER_SUSPECT_COLLECTION_INTERVAL); + // suspectRequestCollectorThread.setDaemon(true); + // suspectRequestCollectorThread.start() + serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() { final AtomicInteger threadIdx = new AtomicInteger(); @@ -635,15 +650,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) { ServerSocket serverSocket; try { - serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, - true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false); + serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false); socketPort = serverSocket.getLocalPort(); } catch (IOException | SystemConnectException e) { throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e); } return serverSocket; } - + /** * start the thread that listens for tcp/ip connections and responds * to connection attempts @@ -656,15 +670,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort); Socket socket = null; try { - while (!services.getCancelCriterion().isCancelInProgress() - && !GMSHealthMonitor.this.isStopping) { + while (!services.getCancelCriterion().isCancelInProgress() && !GMSHealthMonitor.this.isStopping) { try { socket = ssocket.accept(); if (GMSHealthMonitor.this.playingDead) { continue; } serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start(); [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds - + } catch (RejectedExecutionException e) { // this can happen during shutdown @@ -696,7 +709,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } }); } - + /** * start the thread that periodically sends a message to processes * that might be watching this process @@ -707,10 +720,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { Thread.currentThread().setName("Geode Heartbeat Sender"); sendPeriodicHeartbeats(); } + private void sendPeriodicHeartbeats() { while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) { try { - Thread.sleep(memberTimeout/LOGICAL_INTERVAL); + Thread.sleep(memberTimeout / LOGICAL_INTERVAL); } catch (InterruptedException e) { return; } @@ -727,7 +741,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } } } - + private void sendHeartbeats(List<InternalDistributedMember> mbrs, int startIndex) { InternalDistributedMember coordinator = currentView.getCoordinator(); if (coordinator != null && !coordinator.equals(localAddress)) { @@ -746,10 +760,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { int index = startIndex; int numSent = 0; - for (;;) { + for (; ; ) { index--; if (index < 0) { - index = mbrs.size()-1; + index = mbrs.size() - 1; } InternalDistributedMember mbr = mbrs.get(index); if (mbr.equals(localAddress)) { @@ -782,32 +796,32 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { synchronized (viewVsSuspectedMembers) { viewVsSuspectedMembers.clear(); } - for (Iterator<InternalDistributedMember> it=memberTimeStamps.keySet().iterator(); it.hasNext(); ) { + for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it.hasNext(); ) { if (!newView.contains(it.next())) { it.remove(); } } - for (Iterator<InternalDistributedMember> it=suspectedMemberInView.keySet().iterator(); it.hasNext(); ) { + for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it.hasNext(); ) { if (!newView.contains(it.next())) { it.remove(); } } -// for (InternalDistributedMember mbr: newView.getMembers()) { -// if (!memberVsLastMsgTS.containsKey(mbr)) { -// CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis()); -// memberVsLastMsgTS.put(mbr, customTS); -// } -// } + // for (InternalDistributedMember mbr: newView.getMembers()) { + // if (!memberVsLastMsgTS.containsKey(mbr)) { + // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis()); + // memberVsLastMsgTS.put(mbr, customTS); + // } + // } currentView = newView; setNextNeighbor(newView, null); } /*** * This method sets next neighbour which it needs to watch in current view. - * + * * if nextTo == null * then it watches member next to it. - * + * * It becomes null when we suspect current neighbour, during that time it watches * member next to suspect member. */ @@ -820,16 +834,34 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } List<InternalDistributedMember> allMembers = newView.getMembers(); - - Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers); - checkAllSuspected.removeAll(suspectedMemberInView.keySet()); - checkAllSuspected.remove(localAddress); - if (checkAllSuspected.isEmpty() && allMembers.size() > 1) { - logger.info("All other members are suspect at this point"); - nextNeighbor = null; - return; + + // Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers); + // checkAllSuspected.removeAll(suspectedMemberInView.keySet()); + // checkAllSuspected.remove(localAddress); + // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) { + // logger.info("All other members are suspect at this point"); + // nextNeighbor = null; + // return; + // } + + if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size() - 1) { + boolean nonSuspectFound = false; + for (InternalDistributedMember member : allMembers) { + if (member.equals(localAddress)) { + continue; + } + if (!suspectedMemberInView.containsKey(member)) { + nonSuspectFound = true; + break; + } + } + if (!nonSuspectFound) { + logger.info("All other members are suspect at this point"); + nextNeighbor = null; + return; + } } - + int index = allMembers.indexOf(nextTo); if (index != -1) { int nextNeighborIndex = (index + 1) % allMembers.size(); @@ -844,7 +876,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { nextNeighbor = newNeighbor; } } - + if (nextNeighbor != null && nextNeighbor.equals(localAddress)) { nextNeighbor = null; } @@ -866,10 +898,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { services.getMessenger().addHandler(HeartbeatMessage.class, this); services.getMessenger().addHandler(SuspectMembersMessage.class, this); } - + @Override public void started() { - setLocalAddress( services.getMessenger().getMemberID()); + setLocalAddress(services.getMessenger().getMemberID()); serverSocket = createServerSocket(localAddress.getInetAddress(), services.getConfig().getMembershipPortRange()); startTcpServer(serverSocket); startHeartbeatThread(); @@ -907,11 +939,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { serverSocket.close(); serverSocket = null; logger.info("GMSHealthMonitor server socket is closed in stopServices()."); - } - catch (IOException e) { + } catch (IOException e) { logger.trace("Unexpected exception", e); } - } + } serverSocketExecutor.shutdownNow(); try { serverSocketExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS); @@ -920,10 +951,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated")); } - -// if (suspectRequestCollectorThread != null) { -// suspectRequestCollectorThread.shutdown(); -// } + + // if (suspectRequestCollectorThread != null) { + // suspectRequestCollectorThread.shutdown(); + // } } /*** @@ -969,7 +1000,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { public void emergencyClose() { stopServices(); } - + void setLocalAddress(InternalDistributedMember idm) { this.localAddress = idm; } @@ -983,44 +1014,44 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.trace("processing {}", m); switch (m.getDSFID()) { - case HEARTBEAT_REQUEST: - if (beingSick || playingDead) { - logger.debug("sick member is ignoring check request"); - } else { - processHeartbeatRequest((HeartbeatRequestMessage) m); - } - break; - case HEARTBEAT_RESPONSE: - if (beingSick || playingDead) { - logger.debug("sick member is ignoring check response"); - } else { - processHeartbeat((HeartbeatMessage) m); - } - break; - case SUSPECT_MEMBERS_MESSAGE: - if (beingSick || playingDead) { - logger.debug("sick member is ignoring suspect message"); - } else { - processSuspectMembersRequest((SuspectMembersMessage) m); - } - break; - default: - throw new IllegalArgumentException("unknown message type: " + m); + case HEARTBEAT_REQUEST: + if (beingSick || playingDead) { + logger.debug("sick member is ignoring check request"); + } else { + processHeartbeatRequest((HeartbeatRequestMessage) m); + } + break; + case HEARTBEAT_RESPONSE: + if (beingSick || playingDead) { + logger.debug("sick member is ignoring check response"); + } else { + processHeartbeat((HeartbeatMessage) m); + } + break; + case SUSPECT_MEMBERS_MESSAGE: + if (beingSick || playingDead) { + logger.debug("sick member is ignoring suspect message"); + } else { + processSuspectMembersRequest((SuspectMembersMessage) m); + } + break; + default: + throw new IllegalArgumentException("unknown message type: " + m); } } private void processHeartbeatRequest(HeartbeatRequestMessage m) { - + this.stats.incHeartbeatRequestsReceived(); - + if (this.isStopping || this.playingDead) { return; } - + // only respond if the intended recipient is this member InternalDistributedMember me = localAddress; - if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) { + if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) { HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId()); hm.setRecipient(m.getSender()); Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm); @@ -1059,9 +1090,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * for that member */ private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) { - + this.stats.incSuspectsReceived(); - + NetView cv = currentView; if (cv == null) { @@ -1096,11 +1127,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } } - + if (cv.getCoordinator().equals(localAddress)) { - for (SuspectRequest req: incomingRequest.getMembers()) { - logger.info("received suspect message from {} for {}: {}", - sender, req.getSuspectMember(), req.getReason()); + for (SuspectRequest req : incomingRequest.getMembers()) { + logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason()); } checkIfAvailable(sender, sMembers, cv); }// coordinator ends @@ -1120,9 +1150,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { InternalDistributedMember coordinator = check.getCoordinator(); if (coordinator != null && coordinator.equals(localAddress)) { // new coordinator - for (SuspectRequest req: incomingRequest.getMembers()) { - logger.info("received suspect message from {} for {}: {}", - sender, req.getSuspectMember(), req.getReason()); + for (SuspectRequest req : incomingRequest.getMembers()) { + logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason()); } checkIfAvailable(sender, smbr, cv); } else { @@ -1145,7 +1174,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { viewVsMembers = new HashSet<>(); viewVsSuspectedMembers.put(cv, viewVsMembers); } - for (SuspectRequest sr: sMembers) { + for (SuspectRequest sr : sMembers) { viewVsMembers.add(sr); } } @@ -1157,8 +1186,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * we attempt to connect to its socket and ask if it's the expected member. * Otherwise we send a heartbeat request and wait for a reply. */ - private void checkIfAvailable(final InternalDistributedMember initiator, - List<SuspectRequest> sMembers, final NetView cv) { + private void checkIfAvailable(final InternalDistributedMember initiator, List<SuspectRequest> sMembers, final NetView cv) { for (final SuspectRequest sr : sMembers) { final InternalDistributedMember mbr = sr.getSuspectMember(); @@ -1198,10 +1226,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } } - private boolean inlineCheckIfAvailable( - final InternalDistributedMember initiator, final NetView cv, - boolean initiateRemoval, - final InternalDistributedMember mbr, final String reason) { + private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator, final NetView cv, boolean initiateRemoval, final InternalDistributedMember mbr, final String reason) { if (services.getJoinLeave().isMemberLeaving(mbr)) { return false; @@ -1216,7 +1241,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // for some reason we used to update the timestamp for the member // with the startTime, but we don't want to do that because it looks // like a heartbeat has been received - + logger.info("Performing final check for suspect member {} reason={}", mbr, reason); boolean pinged; int port = cv.getFailureDetectionPort(mbr); @@ -1239,7 +1264,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { doCheckMember(mbr, false); pinged = doTCPCheckMember(mbr, port); } - + if (!pinged && !isStopping) { TimeStamp ts = memberTimeStamps.get(mbr); if (ts == null || ts.getTime() < startTime) { @@ -1263,11 +1288,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } return !failed; } - + @Override public void memberShutdown(DistributedMember mbr, String reason) { } - + @Override public int getFailureDetectionPort() { return this.socketPort; @@ -1275,21 +1300,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private void sendSuspectRequest(final List<SuspectRequest> requests) { // the background suspect-collector thread is currently disabled -// synchronized (suspectRequests) { -// if (suspectRequests.size() > 0) { -// for (SuspectRequest sr: suspectRequests) { -// if (!requests.contains(sr)) { -// requests.add(sr); -// } -// } -// suspectRequests.clear(); -// } -// } + // synchronized (suspectRequests) { + // if (suspectRequests.size() > 0) { + // for (SuspectRequest sr: suspectRequests) { + // if (!requests.contains(sr)) { + // requests.add(sr); + // } + // } + // suspectRequests.clear(); + // } + // } logger.debug("Sending suspect request for members {}", requests); List<InternalDistributedMember> recipients; if (currentView.size() > 4) { HashSet<InternalDistributedMember> filter = new HashSet<>(); - for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements();) { + for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements(); ) { filter.add(e.nextElement()); } filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList())); @@ -1313,15 +1338,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher { + final Timer scheduler; Socket socket; final long timeout; - + ConnectTimeoutTask(Timer scheduler, long timeout) { this.scheduler = scheduler; this.timeout = timeout; } - + @Override public void beforeConnect(Socket socket) { this.socket = socket; @@ -1332,7 +1358,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { public void afterConnect(Socket socket) { cancel(); } - + @Override public void run() { try { @@ -1343,9 +1369,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // ignored - nothing useful to do here } } - + } - + public DMStats getStats() { return this.stats; }
