GEODE-1874: Changed setNextNeighbor to not create HashMap for every p2p invocation
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b3d32850 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b3d32850 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b3d32850 Branch: refs/heads/develop Commit: b3d3285088c1c3950ba9c8e638e02ca8b852bd01 Parents: 3046ea8 Author: Udo Kohlmeyer <[email protected]> Authored: Wed Oct 12 11:54:33 2016 +1100 Committer: Udo Kohlmeyer <[email protected]> Committed: Tue Nov 8 07:09:19 2016 +1100 ---------------------------------------------------------------------- .../membership/gms/fd/GMSHealthMonitor.java | 445 ++++++++++--------- 1 file changed, 224 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b3d32850/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index f3319ac..97a413c 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.geode.distributed.internal.membership.gms.fd; -import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST; -import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE; -import static org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE; +import static org.apache.geode.internal.DataSerializableFixedID.*; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -27,7 +27,19 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -38,7 +50,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.*; +import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; import org.jgroups.util.UUID; @@ -67,23 +79,25 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel; /** * Failure Detection - * - * This class make sure that each member is alive and communicating to this member. To make sure - * that we create the ring of members based on current view. On this ring, each member make sure - * that next-member in ring is communicating with it. For that we record last message timestamp from - * next-member. And if it sees this member has not communicated in last period(member-timeout) then - * we check whether this member is still alive or not. Based on that we informed probable - * coordinators to remove that member from view. - * - * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used to initiate - * suspect processing for any member. First is checks whether the member is responding or not. Then - * it informs probable coordinators to remove that member from view. - * - * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see if that member is - * alive. Then based on removal flag it initiates the suspect processing for that member. - * + * <p> + * This class make sure that each member is alive and communicating to this member. + * To make sure that we create the ring of members based on current view. On this + * ring, each member make sure that next-member in ring is communicating with it. + * For that we record last message timestamp from next-member. And if it sees this + * member has not communicated in last period(member-timeout) then we check whether + * this member is still alive or not. Based on that we informed probable coordinators + * to remove that member from view. + * <p> + * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used + * to initiate suspect processing for any member. First is checks whether the member is + * responding or not. Then it informs probable coordinators to remove that member from + * view. + * <p> + * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see + * if that member is alive. Then based on removal flag it initiates the suspect processing + * for that member. */ -@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems"}) +@SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems" }) public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private Services services; @@ -94,49 +108,50 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { volatile private boolean isStopping = false; private final AtomicInteger requestId = new AtomicInteger(); - /** membership logger */ + /** + * membership logger + */ private static final Logger logger = Services.getLogger(); /** - * The number of recipients of periodic heartbeats. The recipients will be selected from the - * members that are likely to be monitoring this member. + * The number of recipients of periodic heartbeats. The recipients will + * be selected from the members that are likely to be monitoring this member. */ private static final int NUM_HEARTBEATS = Integer.getInteger("geode.heartbeat-recipients", 2); /** - * Member activity will be recorded per interval/period. Timer task will set interval's starting - * time. Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be configured + * Member activity will be recorded per interval/period. Timer task will set interval's starting time. + * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be configured * via a system property with a default of 2. At least 1 interval is needed. */ - public static final int LOGICAL_INTERVAL = - Integer.getInteger("geode.logical-message-received-interval", 2); + public static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 2); - /** stall time to wait for members leaving concurrently */ - public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = - Long.getLong("geode.suspect-member-collection-interval", 200); + /** + * stall time to wait for members leaving concurrently + */ + public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200); private volatile long currentTimeStamp; - /** this member's ID */ + /** + * this member's ID + */ private InternalDistributedMember localAddress; /** * Timestamp at which we last had contact from a member */ - final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = - new ConcurrentHashMap<>(); + final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = new ConcurrentHashMap<>(); /** * Members currently being suspected and the view they were suspected in */ - final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = - new ConcurrentHashMap<>(); + final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = new ConcurrentHashMap<>(); /** * Members undergoing final checks */ - final private List<InternalDistributedMember> membersInFinalCheck = - Collections.synchronizedList(new ArrayList<>(30)); + final private List<InternalDistributedMember> membersInFinalCheck = Collections.synchronizedList(new ArrayList<>(30)); /** * Replies to messages @@ -157,10 +172,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { */ private ScheduledFuture<?> monitorFuture; - /** test hook */ + /** + * test hook + */ private volatile boolean playingDead = false; - /** test hook */ + /** + * test hook + */ private volatile boolean beingSick = false; // For TCP check @@ -170,13 +189,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private volatile int socketPort; private volatile ServerSocket serverSocket; - /** Statistics about health monitor */ + /** + * Statistics about health monitor + */ private DMStats stats; /** * this class is to avoid garbage */ private static class TimeStamp { + private volatile long timeStamp; TimeStamp(long timeStamp) { @@ -193,14 +215,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } /*** - * This class sets start interval timestamp to record the activity of all members. That is used by - * {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to record the activity of - * member. - * + * This class sets start interval timestamp to record the activity of all members. + * That is used by {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to + * record the activity of member. + * * It initiates the suspect processing for next neighbour if it doesn't see any activity from that * member in last interval(member-timeout) */ private class Monitor implements Runnable { + final long memberTimeoutInMillis; public Monitor(long memberTimeout) { @@ -217,7 +240,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { InternalDistributedMember neighbour = nextNeighbor; long currentTime = System.currentTimeMillis(); - // this is the start of interval to record member activity + //this is the start of interval to record member activity GMSHealthMonitor.this.currentTimeStamp = currentTime; if (neighbour != null) { @@ -245,10 +268,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { /*** * Check thread waits on this object for response. It puts requestId in requestIdVsResponse map. - * Response will have requestId, which is used to get ResponseObject. Then it is used to notify - * waiting thread. + * Response will have requestId, which is used to get ResponseObject. Then it is used to + * notify waiting thread. */ private class Response { + private DistributionMessage responseMsg; public DistributionMessage getResponseMsg() { @@ -288,8 +312,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { int myVmViewId = gmbr.getVmViewId(); if (playingDead) { logger.debug("HealthMonitor: simulating sick member in health check"); - } else if (uuidLSBs == myUUID.getLeastSignificantBits() - && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == myVmViewId) { + } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == myVmViewId) { logger.debug("HealthMonitor: sending OK reply"); out.write(OK); out.flush(); @@ -299,11 +322,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.debug("HealthMonitor: server replied OK."); } else { if (logger.isDebugEnabled()) { - logger.debug( - "HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}", - Long.toHexString(myUUID.getMostSignificantBits()), - Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), - Long.toHexString(uuidLSBs), myVmViewId, vmViewId); + logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}", Long.toHexString(myUUID.getMostSignificantBits()), Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs), myVmViewId, vmViewId); } out.write(ERROR); out.flush(); @@ -338,7 +357,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } @SuppressWarnings("EmptyMethod") - public static void loadEmergencyClasses() {} + public static void loadEmergencyClasses() { + } /* * Record the member activity for current time interval. @@ -354,6 +374,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { */ private void contactedBy(InternalDistributedMember sender, long timeStamp) { TimeStamp cTS = new TimeStamp(timeStamp); + //TODO Udo: why putIfAbsent. Surely only put is required cTS = memberTimeStamps.putIfAbsent(sender, cTS); if (cTS != null && cTS.getTime() < timeStamp) { cTS.setTime(timeStamp); @@ -365,8 +386,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } - private HeartbeatRequestMessage constructHeartbeatRequestMessage( - final InternalDistributedMember mbr) { + private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember mbr) { final int reqId = requestId.getAndIncrement(); final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId); hrm.setRecipient(mbr); @@ -414,8 +434,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } /** - * This method sends heartbeat request to other member and waits for member-timeout time for - * response. If it doesn't see response then it returns false. + * This method sends heartbeat request to other member and waits for member-timeout + * time for response. If it doesn't see response then it returns false. */ private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) { if (playingDead || beingSick) { @@ -466,9 +486,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } } } catch (InterruptedException e) { - logger.debug( - "GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", - member); + logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member); } finally { if (waitForResponse) { requestIdVsResponse.remove(hrm.getRequestId()); @@ -478,23 +496,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } /** - * During final check, establish TCP connection between current member and suspect member. And - * exchange PING/PONG message to see if the suspect member is still alive. - * + * During final check, establish TCP connection between current member and suspect member. + * And exchange PING/PONG message to see if the suspect member is still alive. + * * @param suspectMember member that does not respond to HeartbeatRequestMessage + * * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false. */ boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) { Socket clientSocket = null; - InternalDistributedSystem internalDistributedSystem = - InternalDistributedSystem.getConnectedInstance(); + InternalDistributedSystem internalDistributedSystem = InternalDistributedSystem.getConnectedInstance(); try { - logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, - suspectMember.getInetAddress(), port); - clientSocket = - SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER) - .connect(suspectMember.getInetAddress(), port, (int) memberTimeout, - new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false); + logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port); + clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false); clientSocket.setTcpNoDelay(true); return doTCPCheckMember(suspectMember, clientSocket); } catch (IOException e) { @@ -517,7 +531,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { return false; } - // Package protected for testing purposes + //Package protected for testing purposes boolean doTCPCheckMember(InternalDistributedMember suspectMember, Socket clientSocket) { try { if (clientSocket.isConnected()) { @@ -531,8 +545,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.debug("Connected to suspect member - reading response"); int b = in.read(); if (logger.isDebugEnabled()) { - logger.debug("Received {}", - (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b))); + logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b))); } if (b >= 0) { this.stats.incFinalCheckResponsesReceived(); @@ -545,7 +558,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } return true; } else { - // received ERROR + //received ERROR return false; } } else {// cannot establish TCP connection with suspect member @@ -572,20 +585,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { public void suspect(InternalDistributedMember mbr, String reason) { initiateSuspicion(mbr, reason); // Background suspect-collecting thread is currently disabled - it takes too long - // synchronized (suspectRequests) { - // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason); - // if (!suspectRequests.contains(sr)) { - // logger.info("Suspecting member {}. Reason= {}.", mbr, reason); - // suspectRequests.add(sr); - // suspectRequests.notify(); - // } - // } + // synchronized (suspectRequests) { + // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason); + // if (!suspectRequests.contains(sr)) { + // logger.info("Suspecting member {}. Reason= {}.", mbr, reason); + // suspectRequests.add(sr); + // suspectRequests.notify(); + // } + // } } @Override public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) { - return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, - (InternalDistributedMember) mbr, reason); + return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember) mbr, reason); } public void start() { @@ -601,8 +613,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public Thread newThread(Runnable r) { int id = threadIdx.getAndIncrement(); - Thread th = - new Thread(Services.getThreadGroup(), r, "Geode Failure Detection thread " + id); + Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection thread " + id); th.setDaemon(true); return th; } @@ -611,17 +622,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { long delay = memberTimeout / LOGICAL_INTERVAL; monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS); - // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect - // Message Collector", Services.getThreadGroup(), suspectRequests, - // new Callback<SuspectRequest>() { - // @Override - // public void process(List<SuspectRequest> requests) { - // GMSHealthMonitor.this.sendSuspectRequest(requests); + // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests, + // new Callback<SuspectRequest>() { + // @Override + // public void process(List<SuspectRequest> requests) { + // GMSHealthMonitor.this.sendSuspectRequest(requests); // - // } - // }, MEMBER_SUSPECT_COLLECTION_INTERVAL); - // suspectRequestCollectorThread.setDaemon(true); - // suspectRequestCollectorThread.start() + // } + // }, MEMBER_SUSPECT_COLLECTION_INTERVAL); + // suspectRequestCollectorThread.setDaemon(true); + // suspectRequestCollectorThread.start() serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() { final AtomicInteger threadIdx = new AtomicInteger(); @@ -629,8 +639,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public Thread newThread(Runnable r) { int id = threadIdx.getAndIncrement(); - Thread th = - new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Server thread " + id); + Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Server thread " + id); th.setDaemon(true); return th; } @@ -641,44 +650,33 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) { ServerSocket serverSocket; try { - serverSocket = SocketCreatorFactory - .getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER) - .createServerSocketUsingPortRange(socketAddress, 50/* backlog */, true/* isBindAddress */, - false/* useNIO */, 65536/* tcpBufferSize */, portRange, false); + serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false); socketPort = serverSocket.getLocalPort(); } catch (IOException | SystemConnectException e) { - throw new GemFireConfigException( - "Unable to allocate a failure detection port in the membership-port range", e); + throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e); } return serverSocket; } /** - * start the thread that listens for tcp/ip connections and responds to connection attempts + * start the thread that listens for tcp/ip connections and responds + * to connection attempts */ private void startTcpServer(ServerSocket ssocket) { // allocate a socket here so there are no race conditions between knowing the FD // socket port and joining the system serverSocketExecutor.execute(() -> { - logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), - socketPort); + logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort); Socket socket = null; try { - while (!services.getCancelCriterion().isCancelInProgress() - && !GMSHealthMonitor.this.isStopping) { + while (!services.getCancelCriterion().isCancelInProgress() && !GMSHealthMonitor.this.isStopping) { try { socket = ssocket.accept(); if (GMSHealthMonitor.this.playingDead) { continue; } - serverSocketExecutor.execute(new ClientSocketHandler(socket)); // start(); [bruce] I'm - // seeing a lot of - // failures due to this - // thread not being - // created fast enough, - // sometimes as long as - // 30 seconds + serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start(); [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds } catch (RejectedExecutionException e) { // this can happen during shutdown @@ -713,8 +711,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } /** - * start the thread that periodically sends a message to processes that might be watching this - * process + * start the thread that periodically sends a message to processes + * that might be watching this process */ private void startHeartbeatThread() { checkExecutor.execute(new Runnable() { @@ -762,7 +760,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { int index = startIndex; int numSent = 0; - for (;;) { + for (; ; ) { index--; if (index < 0) { index = mbrs.size() - 1; @@ -798,35 +796,34 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { synchronized (viewVsSuspectedMembers) { viewVsSuspectedMembers.clear(); } - for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it - .hasNext();) { + for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it.hasNext(); ) { if (!newView.contains(it.next())) { it.remove(); } } - for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it - .hasNext();) { + for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it.hasNext(); ) { if (!newView.contains(it.next())) { it.remove(); } } - // for (InternalDistributedMember mbr: newView.getMembers()) { - // if (!memberVsLastMsgTS.containsKey(mbr)) { - // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis()); - // memberVsLastMsgTS.put(mbr, customTS); - // } - // } + // for (InternalDistributedMember mbr: newView.getMembers()) { + // if (!memberVsLastMsgTS.containsKey(mbr)) { + // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis()); + // memberVsLastMsgTS.put(mbr, customTS); + // } + // } currentView = newView; setNextNeighbor(newView, null); } /*** * This method sets next neighbour which it needs to watch in current view. - * - * if nextTo == null then it watches member next to it. - * - * It becomes null when we suspect current neighbour, during that time it watches member next to - * suspect member. + * + * if nextTo == null + * then it watches member next to it. + * + * It becomes null when we suspect current neighbour, during that time it watches + * member next to suspect member. */ private synchronized void setNextNeighbor(NetView newView, InternalDistributedMember nextTo) { if (newView == null) { @@ -838,13 +835,31 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { List<InternalDistributedMember> allMembers = newView.getMembers(); - Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers); - checkAllSuspected.removeAll(suspectedMemberInView.keySet()); - checkAllSuspected.remove(localAddress); - if (checkAllSuspected.isEmpty() && allMembers.size() > 1) { - logger.info("All other members are suspect at this point"); - nextNeighbor = null; - return; + // Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers); + // checkAllSuspected.removeAll(suspectedMemberInView.keySet()); + // checkAllSuspected.remove(localAddress); + // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) { + // logger.info("All other members are suspect at this point"); + // nextNeighbor = null; + // return; + // } + + if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size() - 1) { + boolean nonSuspectFound = false; + for (InternalDistributedMember member : allMembers) { + if (member.equals(localAddress)) { + continue; + } + if (!suspectedMemberInView.containsKey(member)) { + nonSuspectFound = true; + break; + } + } + if (!nonSuspectFound) { + logger.info("All other members are suspect at this point"); + nextNeighbor = null; + return; + } } int index = allMembers.indexOf(nextTo); @@ -887,8 +902,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public void started() { setLocalAddress(services.getMessenger().getMemberID()); - serverSocket = createServerSocket(localAddress.getInetAddress(), - services.getConfig().getMembershipPortRange()); + serverSocket = createServerSocket(localAddress.getInetAddress(), services.getConfig().getMembershipPortRange()); startTcpServer(serverSocket); startHeartbeatThread(); } @@ -935,21 +949,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - logger.info("GMSHealthMonitor serverSocketExecutor is " - + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated")); + logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated")); } - // if (suspectRequestCollectorThread != null) { - // suspectRequestCollectorThread.shutdown(); - // } + // if (suspectRequestCollectorThread != null) { + // suspectRequestCollectorThread.shutdown(); + // } } /*** * test method */ public boolean isShutdown() { - return scheduler.isShutdown() && checkExecutor.isShutdown() - && serverSocketExecutor.isShutdown() /* && !suspectRequestCollectorThread.isAlive() */; + return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown() /*&& !suspectRequestCollectorThread.isAlive()*/; } /** @@ -965,8 +977,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } @Override - public void memberSuspected(InternalDistributedMember initiator, - InternalDistributedMember suspect, String reason) {} + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect, String reason) { + } @Override public void beSick() { @@ -1059,23 +1071,23 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { contactedBy(m.getSender(), System.currentTimeMillis()); } else { Response resp = requestIdVsResponse.get(m.getRequestId()); - logger.trace("Got heartbeat from member {}. {}", m.getSender(), - (resp != null ? "Check thread still waiting" : "Check thread is not waiting")); + logger.trace("Got heartbeat from member {}. {}", m.getSender(), (resp != null ? "Check thread still waiting" : "Check thread is not waiting")); if (resp != null) { synchronized (resp) { resp.setResponseMsg(m); resp.notify(); } } - // we got heartbeat lets update timestamp + //we got heartbeat lets update timestamp contactedBy(m.getSender(), System.currentTimeMillis()); } } /** - * Process a Suspect request from another member. This may cause this member to become the new - * membership coordinator. it will to final check on that member and then it will send remove - * request for that member + * Process a Suspect request from another member. This may cause this member + * to become the new membership coordinator. + * it will to final check on that member and then it will send remove request + * for that member */ private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) { @@ -1092,16 +1104,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { InternalDistributedMember sender = incomingRequest.getSender(); int viewId = sender.getVmViewId(); if (cv.getViewId() >= viewId && !cv.contains(incomingRequest.getSender())) { - logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member " - + incomingRequest.getSender()); - services.getJoinLeave().remove(sender, - "this process is initiating suspect processing but is no longer a member"); + logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member " + incomingRequest.getSender()); + services.getJoinLeave().remove(sender, "this process is initiating suspect processing but is no longer a member"); return; } // take care of any suspicion of this member by sending a heartbeat back if (!playingDead) { - for (Iterator<SuspectRequest> it = incomingRequest.getMembers().iterator(); it.hasNext();) { + for (Iterator<SuspectRequest> it = incomingRequest.getMembers().iterator(); it.hasNext(); ) { SuspectRequest req = it.next(); if (req.getSuspectMember().equals(localAddress)) { HeartbeatMessage message = new HeartbeatMessage(-1); @@ -1120,11 +1130,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (cv.getCoordinator().equals(localAddress)) { for (SuspectRequest req : incomingRequest.getMembers()) { - logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), - req.getReason()); + logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason()); } checkIfAvailable(sender, sMembers, cv); - } // coordinator ends + }// coordinator ends else { NetView check = new NetView(cv, cv.getViewId() + 1); @@ -1142,8 +1151,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (coordinator != null && coordinator.equals(localAddress)) { // new coordinator for (SuspectRequest req : incomingRequest.getMembers()) { - logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), - req.getReason()); + logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason()); } checkIfAvailable(sender, smbr, cv); } else { @@ -1154,8 +1162,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } /*** - * This method make sure that records suspectRequest. We need to make sure this on preferred - * coordinators, as elder coordinator might be in suspected list next. + * This method make sure that records suspectRequest. We need to make sure this + * on preferred coordinators, as elder coordinator might be in suspected list next. */ private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView cv) { // record suspect requests @@ -1173,12 +1181,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } /** - * performs a "final" health check on the member. If failure-detection socket information is - * available for the member (in the view) then we attempt to connect to its socket and ask if it's - * the expected member. Otherwise we send a heartbeat request and wait for a reply. + * performs a "final" health check on the member. If failure-detection + * socket information is available for the member (in the view) then + * we attempt to connect to its socket and ask if it's the expected member. + * Otherwise we send a heartbeat request and wait for a reply. */ - private void checkIfAvailable(final InternalDistributedMember initiator, - List<SuspectRequest> sMembers, final NetView cv) { + private void checkIfAvailable(final InternalDistributedMember initiator, List<SuspectRequest> sMembers, final NetView cv) { for (final SuspectRequest sr : sMembers) { final InternalDistributedMember mbr = sr.getSuspectMember(); @@ -1193,13 +1201,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // suspectMemberInView is now set by the heartbeat monitoring code // to allow us to move on from watching members we've already - // suspected. Since that code is updating this collection we + // suspected. Since that code is updating this collection we // cannot use it here as an indication that a member is currently // undergoing a final check. - // NetView view; - // view = suspectedMemberInView.putIfAbsent(mbr, cv); + // NetView view; + // view = suspectedMemberInView.putIfAbsent(mbr, cv); - // if (view == null || !view.equals(cv)) { + // if (view == null || !view.equals(cv)) { final String reason = sr.getReason(); logger.debug("Scheduling final check for member {}; reason={}", mbr, reason); // its a coordinator @@ -1214,13 +1222,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { GMSHealthMonitor.this.suspectedMemberInView.remove(mbr); } }); - // }// scheduling for final check and removing it.. + // }// scheduling for final check and removing it.. } } - private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator, - final NetView cv, boolean initiateRemoval, final InternalDistributedMember mbr, - final String reason) { + private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator, final NetView cv, boolean initiateRemoval, final InternalDistributedMember mbr, final String reason) { if (services.getJoinLeave().isMemberLeaving(mbr)) { return false; @@ -1242,8 +1248,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (port <= 0) { logger.info("Unable to locate failure detection port - requesting a heartbeat"); if (logger.isDebugEnabled()) { - logger.debug("\ncurrent view: {}\nports: {}", cv, - Arrays.toString(cv.getFailureDetectionPorts())); + logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts())); } pinged = GMSHealthMonitor.this.doCheckMember(mbr, true); GMSHealthMonitor.this.stats.incFinalCheckRequestsSent(); @@ -1253,9 +1258,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived(); } } else { - // this will just send heartbeat request, it will not wait for response - // if we will get heartbeat then it will change the timestamp, which we are - // checking below in case of tcp check failure.. + //this will just send heartbeat request, it will not wait for response + //if we will get heartbeat then it will change the timestamp, which we are + //checking below in case of tcp check failure.. doCheckMember(mbr, false); pinged = doTCPCheckMember(mbr, port); } @@ -1269,8 +1274,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } failed = true; } else { - logger.info( - "Final check failed but detected recent message traffic for suspect member " + mbr); + logger.info("Final check failed but detected recent message traffic for suspect member " + mbr); } } if (!failed) { @@ -1286,7 +1290,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } @Override - public void memberShutdown(DistributedMember mbr, String reason) {} + public void memberShutdown(DistributedMember mbr, String reason) { + } @Override public int getFailureDetectionPort() { @@ -1295,28 +1300,25 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private void sendSuspectRequest(final List<SuspectRequest> requests) { // the background suspect-collector thread is currently disabled - // synchronized (suspectRequests) { - // if (suspectRequests.size() > 0) { - // for (SuspectRequest sr: suspectRequests) { - // if (!requests.contains(sr)) { - // requests.add(sr); - // } - // } - // suspectRequests.clear(); - // } - // } + // synchronized (suspectRequests) { + // if (suspectRequests.size() > 0) { + // for (SuspectRequest sr: suspectRequests) { + // if (!requests.contains(sr)) { + // requests.add(sr); + // } + // } + // suspectRequests.clear(); + // } + // } logger.debug("Sending suspect request for members {}", requests); List<InternalDistributedMember> recipients; if (currentView.size() > 4) { HashSet<InternalDistributedMember> filter = new HashSet<>(); - for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e - .hasMoreElements();) { + for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements(); ) { filter.add(e.nextElement()); } - filter.addAll( - requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList())); - recipients = - currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5); + filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList())); + recipients = currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5); } else { recipients = currentView.getMembers(); } @@ -1336,6 +1338,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher { + final Timer scheduler; Socket socket; final long timeout;
