GEODE-1874: Changed setNextNeighbor to not create HashMap for every p2p 
invocation


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b3d32850
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b3d32850
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b3d32850

Branch: refs/heads/develop
Commit: b3d3285088c1c3950ba9c8e638e02ca8b852bd01
Parents: 3046ea8
Author: Udo Kohlmeyer <[email protected]>
Authored: Wed Oct 12 11:54:33 2016 +1100
Committer: Udo Kohlmeyer <[email protected]>
Committed: Tue Nov 8 07:09:19 2016 +1100

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 445 ++++++++++---------
 1 file changed, 224 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b3d32850/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index f3319ac..97a413c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1,22 +1,22 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.geode.distributed.internal.membership.gms.fd;
 
-import static 
org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
-import static 
org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
-import static 
org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+import static org.apache.geode.internal.DataSerializableFixedID.*;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -27,7 +27,19 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -38,7 +50,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.*;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
 import org.jgroups.util.UUID;
@@ -67,23 +79,25 @@ import 
org.apache.geode.internal.security.SecurableCommunicationChannel;
 
 /**
  * Failure Detection
- * 
- * This class make sure that each member is alive and communicating to this 
member. To make sure
- * that we create the ring of members based on current view. On this ring, 
each member make sure
- * that next-member in ring is communicating with it. For that we record last 
message timestamp from
- * next-member. And if it sees this member has not communicated in last 
period(member-timeout) then
- * we check whether this member is still alive or not. Based on that we 
informed probable
- * coordinators to remove that member from view.
- * 
- * It has {@link #suspect(InternalDistributedMember, String)} api, which can 
be used to initiate
- * suspect processing for any member. First is checks whether the member is 
responding or not. Then
- * it informs probable coordinators to remove that member from view.
- * 
- * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to 
see if that member is
- * alive. Then based on removal flag it initiates the suspect processing for 
that member.
- * 
+ * <p>
+ * This class make sure that each member is alive and communicating to this 
member.
+ * To make sure that we create the ring of members based on current view. On 
this
+ * ring, each member make sure that next-member in ring is communicating with 
it.
+ * For that we record last message timestamp from next-member. And if it sees 
this
+ * member has not communicated in last period(member-timeout) then we check 
whether
+ * this member is still alive or not. Based on that we informed probable 
coordinators
+ * to remove that member from view.
+ * <p>
+ * It has {@link #suspect(InternalDistributedMember, String)} api, which can 
be used
+ * to initiate suspect processing for any member. First is checks whether the 
member is
+ * responding or not. Then it informs probable coordinators to remove that 
member from
+ * view.
+ * <p>
+ * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to 
see
+ * if that member is alive. Then based on removal flag it initiates the 
suspect processing
+ * for that member.
  */
-@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", 
"NullableProblems"})
+@SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", 
"NullableProblems" })
 public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   private Services services;
@@ -94,49 +108,50 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   volatile private boolean isStopping = false;
   private final AtomicInteger requestId = new AtomicInteger();
 
-  /** membership logger */
+  /**
+   * membership logger
+   */
   private static final Logger logger = Services.getLogger();
 
   /**
-   * The number of recipients of periodic heartbeats. The recipients will be 
selected from the
-   * members that are likely to be monitoring this member.
+   * The number of recipients of periodic heartbeats.  The recipients will
+   * be selected from the members that are likely to be monitoring this member.
    */
   private static final int NUM_HEARTBEATS = 
Integer.getInteger("geode.heartbeat-recipients", 2);
 
   /**
-   * Member activity will be recorded per interval/period. Timer task will set 
interval's starting
-   * time. Each interval will be member-timeout/LOGICAL_INTERVAL. 
LOGICAL_INTERVAL may be configured
+   * Member activity will be recorded per interval/period. Timer task will set 
interval's starting time.
+   * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL 
may be configured
    * via a system property with a default of 2. At least 1 interval is needed.
    */
-  public static final int LOGICAL_INTERVAL =
-      Integer.getInteger("geode.logical-message-received-interval", 2);
+  public static final int LOGICAL_INTERVAL = 
Integer.getInteger("geode.logical-message-received-interval", 2);
 
-  /** stall time to wait for members leaving concurrently */
-  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL =
-      Long.getLong("geode.suspect-member-collection-interval", 200);
+  /**
+   * stall time to wait for members leaving concurrently
+   */
+  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = 
Long.getLong("geode.suspect-member-collection-interval", 200);
 
   private volatile long currentTimeStamp;
 
-  /** this member's ID */
+  /**
+   * this member's ID
+   */
   private InternalDistributedMember localAddress;
 
   /**
    * Timestamp at which we last had contact from a member
    */
-  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
-      new ConcurrentHashMap<>();
+  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = 
new ConcurrentHashMap<>();
 
   /**
    * Members currently being suspected and the view they were suspected in
    */
-  final private ConcurrentHashMap<InternalDistributedMember, NetView> 
suspectedMemberInView =
-      new ConcurrentHashMap<>();
+  final private ConcurrentHashMap<InternalDistributedMember, NetView> 
suspectedMemberInView = new ConcurrentHashMap<>();
 
   /**
    * Members undergoing final checks
    */
-  final private List<InternalDistributedMember> membersInFinalCheck =
-      Collections.synchronizedList(new ArrayList<>(30));
+  final private List<InternalDistributedMember> membersInFinalCheck = 
Collections.synchronizedList(new ArrayList<>(30));
 
   /**
    * Replies to messages
@@ -157,10 +172,14 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
    */
   private ScheduledFuture<?> monitorFuture;
 
-  /** test hook */
+  /**
+   * test hook
+   */
   private volatile boolean playingDead = false;
 
-  /** test hook */
+  /**
+   * test hook
+   */
   private volatile boolean beingSick = false;
 
   // For TCP check
@@ -170,13 +189,16 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   private volatile int socketPort;
   private volatile ServerSocket serverSocket;
 
-  /** Statistics about health monitor */
+  /**
+   * Statistics about health monitor
+   */
   private DMStats stats;
 
   /**
    * this class is to avoid garbage
    */
   private static class TimeStamp {
+
     private volatile long timeStamp;
 
     TimeStamp(long timeStamp) {
@@ -193,14 +215,15 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /***
-   * This class sets start interval timestamp to record the activity of all 
members. That is used by
-   * {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to record 
the activity of
-   * member.
-   * 
+   * This class sets start interval timestamp to record the activity of all 
members.
+   * That is used by {@link 
GMSHealthMonitor#contactedBy(InternalDistributedMember)} to
+   * record the activity of member.
+   *
    * It initiates the suspect processing for next neighbour if it doesn't see 
any activity from that
    * member in last interval(member-timeout)
    */
   private class Monitor implements Runnable {
+
     final long memberTimeoutInMillis;
 
     public Monitor(long memberTimeout) {
@@ -217,7 +240,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       InternalDistributedMember neighbour = nextNeighbor;
 
       long currentTime = System.currentTimeMillis();
-      // this is the start of interval to record member activity
+      //this is the start of interval to record member activity
       GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
       if (neighbour != null) {
@@ -245,10 +268,11 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
   /***
    * Check thread waits on this object for response. It puts requestId in 
requestIdVsResponse map.
-   * Response will have requestId, which is used to get ResponseObject. Then 
it is used to notify
-   * waiting thread.
+   * Response will have requestId, which is used to get ResponseObject. Then 
it is used to
+   * notify waiting thread.
    */
   private class Response {
+
     private DistributionMessage responseMsg;
 
     public DistributionMessage getResponseMsg() {
@@ -288,8 +312,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
         int myVmViewId = gmbr.getVmViewId();
         if (playingDead) {
           logger.debug("HealthMonitor: simulating sick member in health 
check");
-        } else if (uuidLSBs == myUUID.getLeastSignificantBits()
-            && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == 
myVmViewId) {
+        } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == 
myUUID.getMostSignificantBits() && vmViewId == myVmViewId) {
           logger.debug("HealthMonitor: sending OK reply");
           out.write(OK);
           out.flush();
@@ -299,11 +322,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           logger.debug("HealthMonitor: server replied OK.");
         } else {
           if (logger.isDebugEnabled()) {
-            logger.debug(
-                "HealthMonitor: sending ERROR reply - my UUID is {},{} 
received is {},{}.  My viewID is {} received is {}",
-                Long.toHexString(myUUID.getMostSignificantBits()),
-                Long.toHexString(myUUID.getLeastSignificantBits()), 
Long.toHexString(uuidMSBs),
-                Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
+            logger.debug("HealthMonitor: sending ERROR reply - my UUID is 
{},{} received is {},{}.  My viewID is {} received is {}", 
Long.toHexString(myUUID.getMostSignificantBits()), 
Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), 
Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
           }
           out.write(ERROR);
           out.flush();
@@ -338,7 +357,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   @SuppressWarnings("EmptyMethod")
-  public static void loadEmergencyClasses() {}
+  public static void loadEmergencyClasses() {
+  }
 
   /*
    * Record the member activity for current time interval.
@@ -354,6 +374,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
    */
   private void contactedBy(InternalDistributedMember sender, long timeStamp) {
     TimeStamp cTS = new TimeStamp(timeStamp);
+    //TODO Udo: why putIfAbsent. Surely only put is required
     cTS = memberTimeStamps.putIfAbsent(sender, cTS);
     if (cTS != null && cTS.getTime() < timeStamp) {
       cTS.setTime(timeStamp);
@@ -365,8 +386,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
 
-  private HeartbeatRequestMessage constructHeartbeatRequestMessage(
-      final InternalDistributedMember mbr) {
+  private HeartbeatRequestMessage constructHeartbeatRequestMessage(final 
InternalDistributedMember mbr) {
     final int reqId = requestId.getAndIncrement();
     final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, 
reqId);
     hrm.setRecipient(mbr);
@@ -414,8 +434,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * This method sends heartbeat request to other member and waits for 
member-timeout time for
-   * response. If it doesn't see response then it returns false.
+   * This method sends heartbeat request to other member and waits for 
member-timeout
+   * time for response. If it doesn't see response then it returns false.
    */
   private boolean doCheckMember(InternalDistributedMember member, boolean 
waitForResponse) {
     if (playingDead || beingSick) {
@@ -466,9 +486,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
         }
       }
     } catch (InterruptedException e) {
-      logger.debug(
-          "GMSHealthMonitor checking thread interrupted, while waiting for 
response from member: {} .",
-          member);
+      logger.debug("GMSHealthMonitor checking thread interrupted, while 
waiting for response from member: {} .", member);
     } finally {
       if (waitForResponse) {
         requestIdVsResponse.remove(hrm.getRequestId());
@@ -478,23 +496,19 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * During final check, establish TCP connection between current member and 
suspect member. And
-   * exchange PING/PONG message to see if the suspect member is still alive.
-   * 
+   * During final check, establish TCP connection between current member and 
suspect member.
+   * And exchange PING/PONG message to see if the suspect member is still 
alive.
+   *
    * @param suspectMember member that does not respond to 
HeartbeatRequestMessage
+   *
    * @return true if successfully exchanged PING/PONG with TCP connection, 
otherwise false.
    */
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
     Socket clientSocket = null;
-    InternalDistributedSystem internalDistributedSystem =
-        InternalDistributedSystem.getConnectedInstance();
+    InternalDistributedSystem internalDistributedSystem = 
InternalDistributedSystem.getConnectedInstance();
     try {
-      logger.debug("Checking member {} with TCP socket connection {}:{}.", 
suspectMember,
-          suspectMember.getInetAddress(), port);
-      clientSocket =
-          
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
-              .connect(suspectMember.getInetAddress(), port, (int) 
memberTimeout,
-                  new ConnectTimeoutTask(services.getTimer(), memberTimeout), 
false, -1, false);
+      logger.debug("Checking member {} with TCP socket connection {}:{}.", 
suspectMember, suspectMember.getInetAddress(), port);
+      clientSocket = 
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(),
 port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), 
memberTimeout), false, -1, false);
       clientSocket.setTcpNoDelay(true);
       return doTCPCheckMember(suspectMember, clientSocket);
     } catch (IOException e) {
@@ -517,7 +531,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     return false;
   }
 
-  // Package protected for testing purposes
+  //Package protected for testing purposes
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, Socket 
clientSocket) {
     try {
       if (clientSocket.isConnected()) {
@@ -531,8 +545,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
         logger.debug("Connected to suspect member - reading response");
         int b = in.read();
         if (logger.isDebugEnabled()) {
-          logger.debug("Received {}",
-              (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + 
b)));
+          logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" 
: "unknown response: " + b)));
         }
         if (b >= 0) {
           this.stats.incFinalCheckResponsesReceived();
@@ -545,7 +558,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           }
           return true;
         } else {
-          // received ERROR
+          //received ERROR
           return false;
         }
       } else {// cannot establish TCP connection with suspect member
@@ -572,20 +585,19 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   public void suspect(InternalDistributedMember mbr, String reason) {
     initiateSuspicion(mbr, reason);
     // Background suspect-collecting thread is currently disabled - it takes 
too long
-    // synchronized (suspectRequests) {
-    // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, 
reason);
-    // if (!suspectRequests.contains(sr)) {
-    // logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-    // suspectRequests.add(sr);
-    // suspectRequests.notify();
-    // }
-    // }
+    //    synchronized (suspectRequests) {
+    //      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) 
mbr, reason);
+    //      if (!suspectRequests.contains(sr)) {
+    //        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
+    //        suspectRequests.add(sr);
+    //        suspectRequests.notify();
+    //      }
+    //    }
   }
 
   @Override
   public boolean checkIfAvailable(DistributedMember mbr, String reason, 
boolean initiateRemoval) {
-    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval,
-        (InternalDistributedMember) mbr, reason);
+    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, 
(InternalDistributedMember) mbr, reason);
   }
 
   public void start() {
@@ -601,8 +613,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       @Override
       public Thread newThread(Runnable r) {
         int id = threadIdx.getAndIncrement();
-        Thread th =
-            new Thread(Services.getThreadGroup(), r, "Geode Failure Detection 
thread " + id);
+        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure 
Detection thread " + id);
         th.setDaemon(true);
         return th;
       }
@@ -611,17 +622,16 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     long delay = memberTimeout / LOGICAL_INTERVAL;
     monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, 
TimeUnit.MILLISECONDS);
 
-    // suspectRequestCollectorThread = this.new 
RequestCollector<SuspectRequest>("Geode Suspect
-    // Message Collector", Services.getThreadGroup(), suspectRequests,
-    // new Callback<SuspectRequest>() {
-    // @Override
-    // public void process(List<SuspectRequest> requests) {
-    // GMSHealthMonitor.this.sendSuspectRequest(requests);
+    //    suspectRequestCollectorThread = this.new 
RequestCollector<SuspectRequest>("Geode Suspect Message Collector", 
Services.getThreadGroup(), suspectRequests,
+    //        new Callback<SuspectRequest>() {
+    //      @Override
+    //      public void process(List<SuspectRequest> requests) {
+    //        GMSHealthMonitor.this.sendSuspectRequest(requests);
     //
-    // }
-    // }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-    // suspectRequestCollectorThread.setDaemon(true);
-    // suspectRequestCollectorThread.start()
+    //      }
+    //    }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+    //    suspectRequestCollectorThread.setDaemon(true);
+    //    suspectRequestCollectorThread.start()
 
     serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
       final AtomicInteger threadIdx = new AtomicInteger();
@@ -629,8 +639,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       @Override
       public Thread newThread(Runnable r) {
         int id = threadIdx.getAndIncrement();
-        Thread th =
-            new Thread(Services.getThreadGroup(), r, "Geode Failure Detection 
Server thread " + id);
+        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure 
Detection Server thread " + id);
         th.setDaemon(true);
         return th;
       }
@@ -641,44 +650,33 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
     ServerSocket serverSocket;
     try {
-      serverSocket = SocketCreatorFactory
-          .getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
-          .createServerSocketUsingPortRange(socketAddress, 50/* backlog */, 
true/* isBindAddress */,
-              false/* useNIO */, 65536/* tcpBufferSize */, portRange, false);
+      serverSocket = 
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress,
 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, 
portRange, false);
       socketPort = serverSocket.getLocalPort();
     } catch (IOException | SystemConnectException e) {
-      throw new GemFireConfigException(
-          "Unable to allocate a failure detection port in the membership-port 
range", e);
+      throw new GemFireConfigException("Unable to allocate a failure detection 
port in the membership-port range", e);
     }
     return serverSocket;
   }
 
   /**
-   * start the thread that listens for tcp/ip connections and responds to 
connection attempts
+   * start the thread that listens for tcp/ip connections and responds
+   * to connection attempts
    */
   private void startTcpServer(ServerSocket ssocket) {
     // allocate a socket here so there are no race conditions between knowing 
the FD
     // socket port and joining the system
 
     serverSocketExecutor.execute(() -> {
-      logger.info("Started failure detection server thread on {}:{}.", 
ssocket.getInetAddress(),
-          socketPort);
+      logger.info("Started failure detection server thread on {}:{}.", 
ssocket.getInetAddress(), socketPort);
       Socket socket = null;
       try {
-        while (!services.getCancelCriterion().isCancelInProgress()
-            && !GMSHealthMonitor.this.isStopping) {
+        while (!services.getCancelCriterion().isCancelInProgress() && 
!GMSHealthMonitor.this.isStopping) {
           try {
             socket = ssocket.accept();
             if (GMSHealthMonitor.this.playingDead) {
               continue;
             }
-            serverSocketExecutor.execute(new ClientSocketHandler(socket)); // 
start(); [bruce] I'm
-                                                                           // 
seeing a lot of
-                                                                           // 
failures due to this
-                                                                           // 
thread not being
-                                                                           // 
created fast enough,
-                                                                           // 
sometimes as long as
-                                                                           // 
30 seconds
+            serverSocketExecutor.execute(new ClientSocketHandler(socket)); 
//start();  [bruce] I'm seeing a lot of failures due to this thread not being 
created fast enough, sometimes as long as 30 seconds
 
           } catch (RejectedExecutionException e) {
             // this can happen during shutdown
@@ -713,8 +711,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * start the thread that periodically sends a message to processes that 
might be watching this
-   * process
+   * start the thread that periodically sends a message to processes
+   * that might be watching this process
    */
   private void startHeartbeatThread() {
     checkExecutor.execute(new Runnable() {
@@ -762,7 +760,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
         int index = startIndex;
         int numSent = 0;
-        for (;;) {
+        for (; ; ) {
           index--;
           if (index < 0) {
             index = mbrs.size() - 1;
@@ -798,35 +796,34 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     synchronized (viewVsSuspectedMembers) {
       viewVsSuspectedMembers.clear();
     }
-    for (Iterator<InternalDistributedMember> it = 
memberTimeStamps.keySet().iterator(); it
-        .hasNext();) {
+    for (Iterator<InternalDistributedMember> it = 
memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    for (Iterator<InternalDistributedMember> it = 
suspectedMemberInView.keySet().iterator(); it
-        .hasNext();) {
+    for (Iterator<InternalDistributedMember> it = 
suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    // for (InternalDistributedMember mbr: newView.getMembers()) {
-    // if (!memberVsLastMsgTS.containsKey(mbr)) {
-    // CustomTimeStamp customTS = new 
CustomTimeStamp(System.currentTimeMillis());
-    // memberVsLastMsgTS.put(mbr, customTS);
-    // }
-    // }
+    //    for (InternalDistributedMember mbr: newView.getMembers()) {
+    //      if (!memberVsLastMsgTS.containsKey(mbr)) {
+    //        CustomTimeStamp customTS = new 
CustomTimeStamp(System.currentTimeMillis());
+    //        memberVsLastMsgTS.put(mbr, customTS);
+    //      }
+    //    }
     currentView = newView;
     setNextNeighbor(newView, null);
   }
 
   /***
    * This method sets next neighbour which it needs to watch in current view.
-   * 
-   * if nextTo == null then it watches member next to it.
-   * 
-   * It becomes null when we suspect current neighbour, during that time it 
watches member next to
-   * suspect member.
+   *
+   * if nextTo == null
+   * then it watches member next to it.
+   *
+   * It becomes null when we suspect current neighbour, during that time it 
watches
+   * member next to suspect member.
    */
   private synchronized void setNextNeighbor(NetView newView, 
InternalDistributedMember nextTo) {
     if (newView == null) {
@@ -838,13 +835,31 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
     List<InternalDistributedMember> allMembers = newView.getMembers();
 
-    Set<InternalDistributedMember> checkAllSuspected = new 
HashSet<>(allMembers);
-    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
-    checkAllSuspected.remove(localAddress);
-    if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
-      logger.info("All other members are suspect at this point");
-      nextNeighbor = null;
-      return;
+    //    Set<InternalDistributedMember> checkAllSuspected = new 
HashSet<>(allMembers);
+    //    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
+    //    checkAllSuspected.remove(localAddress);
+    //    if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
+    //      logger.info("All other members are suspect at this point");
+    //      nextNeighbor = null;
+    //      return;
+    //    }
+
+    if (allMembers.size() > 1 && suspectedMemberInView.size() >= 
allMembers.size() - 1) {
+      boolean nonSuspectFound = false;
+      for (InternalDistributedMember member : allMembers) {
+        if (member.equals(localAddress)) {
+          continue;
+        }
+        if (!suspectedMemberInView.containsKey(member)) {
+          nonSuspectFound = true;
+          break;
+        }
+      }
+      if (!nonSuspectFound) {
+        logger.info("All other members are suspect at this point");
+        nextNeighbor = null;
+        return;
+      }
     }
 
     int index = allMembers.indexOf(nextTo);
@@ -887,8 +902,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   @Override
   public void started() {
     setLocalAddress(services.getMessenger().getMemberID());
-    serverSocket = createServerSocket(localAddress.getInetAddress(),
-        services.getConfig().getMembershipPortRange());
+    serverSocket = createServerSocket(localAddress.getInetAddress(), 
services.getConfig().getMembershipPortRange());
     startTcpServer(serverSocket);
     startHeartbeatThread();
   }
@@ -935,21 +949,19 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      logger.info("GMSHealthMonitor serverSocketExecutor is "
-          + (serverSocketExecutor.isTerminated() ? "terminated" : "not 
terminated"));
+      logger.info("GMSHealthMonitor serverSocketExecutor is " + 
(serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
     }
 
-    // if (suspectRequestCollectorThread != null) {
-    // suspectRequestCollectorThread.shutdown();
-    // }
+    //    if (suspectRequestCollectorThread != null) {
+    //      suspectRequestCollectorThread.shutdown();
+    //    }
   }
 
   /***
    * test method
    */
   public boolean isShutdown() {
-    return scheduler.isShutdown() && checkExecutor.isShutdown()
-        && serverSocketExecutor.isShutdown() /* && 
!suspectRequestCollectorThread.isAlive() */;
+    return scheduler.isShutdown() && checkExecutor.isShutdown() && 
serverSocketExecutor.isShutdown() /*&& 
!suspectRequestCollectorThread.isAlive()*/;
   }
 
   /**
@@ -965,8 +977,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   @Override
-  public void memberSuspected(InternalDistributedMember initiator,
-      InternalDistributedMember suspect, String reason) {}
+  public void memberSuspected(InternalDistributedMember initiator, 
InternalDistributedMember suspect, String reason) {
+  }
 
   @Override
   public void beSick() {
@@ -1059,23 +1071,23 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       contactedBy(m.getSender(), System.currentTimeMillis());
     } else {
       Response resp = requestIdVsResponse.get(m.getRequestId());
-      logger.trace("Got heartbeat from member {}. {}", m.getSender(),
-          (resp != null ? "Check thread still waiting" : "Check thread is not 
waiting"));
+      logger.trace("Got heartbeat from member {}. {}", m.getSender(), (resp != 
null ? "Check thread still waiting" : "Check thread is not waiting"));
       if (resp != null) {
         synchronized (resp) {
           resp.setResponseMsg(m);
           resp.notify();
         }
       }
-      // we got heartbeat lets update timestamp
+      //we got heartbeat lets update timestamp
       contactedBy(m.getSender(), System.currentTimeMillis());
     }
   }
 
   /**
-   * Process a Suspect request from another member. This may cause this member 
to become the new
-   * membership coordinator. it will to final check on that member and then it 
will send remove
-   * request for that member
+   * Process a Suspect request from another member. This may cause this member
+   * to become the new membership coordinator.
+   * it will to final check on that member and then it will send remove request
+   * for that member
    */
   private void processSuspectMembersRequest(SuspectMembersMessage 
incomingRequest) {
 
@@ -1092,16 +1104,14 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     InternalDistributedMember sender = incomingRequest.getSender();
     int viewId = sender.getVmViewId();
     if (cv.getViewId() >= viewId && !cv.contains(incomingRequest.getSender())) 
{
-      logger.info("Membership ignoring suspect request for " + incomingRequest 
+ " from non-member "
-          + incomingRequest.getSender());
-      services.getJoinLeave().remove(sender,
-          "this process is initiating suspect processing but is no longer a 
member");
+      logger.info("Membership ignoring suspect request for " + incomingRequest 
+ " from non-member " + incomingRequest.getSender());
+      services.getJoinLeave().remove(sender, "this process is initiating 
suspect processing but is no longer a member");
       return;
     }
 
     // take care of any suspicion of this member by sending a heartbeat back
     if (!playingDead) {
-      for (Iterator<SuspectRequest> it = 
incomingRequest.getMembers().iterator(); it.hasNext();) {
+      for (Iterator<SuspectRequest> it = 
incomingRequest.getMembers().iterator(); it.hasNext(); ) {
         SuspectRequest req = it.next();
         if (req.getSuspectMember().equals(localAddress)) {
           HeartbeatMessage message = new HeartbeatMessage(-1);
@@ -1120,11 +1130,10 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
     if (cv.getCoordinator().equals(localAddress)) {
       for (SuspectRequest req : incomingRequest.getMembers()) {
-        logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(),
-            req.getReason());
+        logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(), req.getReason());
       }
       checkIfAvailable(sender, sMembers, cv);
-    } // coordinator ends
+    }// coordinator ends
     else {
 
       NetView check = new NetView(cv, cv.getViewId() + 1);
@@ -1142,8 +1151,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       if (coordinator != null && coordinator.equals(localAddress)) {
         // new coordinator
         for (SuspectRequest req : incomingRequest.getMembers()) {
-          logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(),
-              req.getReason());
+          logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(), req.getReason());
         }
         checkIfAvailable(sender, smbr, cv);
       } else {
@@ -1154,8 +1162,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /***
-   * This method make sure that records suspectRequest. We need to make sure 
this on preferred
-   * coordinators, as elder coordinator might be in suspected list next.
+   * This method make sure that records suspectRequest. We need to make sure 
this
+   * on preferred coordinators, as elder coordinator might be in suspected 
list next. 
    */
   private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView 
cv) {
     // record suspect requests
@@ -1173,12 +1181,12 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * performs a "final" health check on the member. If failure-detection 
socket information is
-   * available for the member (in the view) then we attempt to connect to its 
socket and ask if it's
-   * the expected member. Otherwise we send a heartbeat request and wait for a 
reply.
+   * performs a "final" health check on the member.  If failure-detection
+   * socket information is available for the member (in the view) then
+   * we attempt to connect to its socket and ask if it's the expected member.
+   * Otherwise we send a heartbeat request and wait for a reply.
    */
-  private void checkIfAvailable(final InternalDistributedMember initiator,
-      List<SuspectRequest> sMembers, final NetView cv) {
+  private void checkIfAvailable(final InternalDistributedMember initiator, 
List<SuspectRequest> sMembers, final NetView cv) {
 
     for (final SuspectRequest sr : sMembers) {
       final InternalDistributedMember mbr = sr.getSuspectMember();
@@ -1193,13 +1201,13 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
       // suspectMemberInView is now set by the heartbeat monitoring code
       // to allow us to move on from watching members we've already
-      // suspected. Since that code is updating this collection we
+      // suspected.  Since that code is updating this collection we
       // cannot use it here as an indication that a member is currently
       // undergoing a final check.
-      // NetView view;
-      // view = suspectedMemberInView.putIfAbsent(mbr, cv);
+      //      NetView view;
+      //      view = suspectedMemberInView.putIfAbsent(mbr, cv);
 
-      // if (view == null || !view.equals(cv)) {
+      //      if (view == null || !view.equals(cv)) {
       final String reason = sr.getReason();
       logger.debug("Scheduling final check for member {}; reason={}", mbr, 
reason);
       // its a coordinator
@@ -1214,13 +1222,11 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
         }
       });
-      // }// scheduling for final check and removing it..
+      //      }// scheduling for final check and removing it..
     }
   }
 
-  private boolean inlineCheckIfAvailable(final InternalDistributedMember 
initiator,
-      final NetView cv, boolean initiateRemoval, final 
InternalDistributedMember mbr,
-      final String reason) {
+  private boolean inlineCheckIfAvailable(final InternalDistributedMember 
initiator, final NetView cv, boolean initiateRemoval, final 
InternalDistributedMember mbr, final String reason) {
 
     if (services.getJoinLeave().isMemberLeaving(mbr)) {
       return false;
@@ -1242,8 +1248,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       if (port <= 0) {
         logger.info("Unable to locate failure detection port - requesting a 
heartbeat");
         if (logger.isDebugEnabled()) {
-          logger.debug("\ncurrent view: {}\nports: {}", cv,
-              Arrays.toString(cv.getFailureDetectionPorts()));
+          logger.debug("\ncurrent view: {}\nports: {}", cv, 
Arrays.toString(cv.getFailureDetectionPorts()));
         }
         pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
@@ -1253,9 +1258,9 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
         }
       } else {
-        // this will just send heartbeat request, it will not wait for response
-        // if we will get heartbeat then it will change the timestamp, which 
we are
-        // checking below in case of tcp check failure..
+        //this will just send heartbeat request, it will not wait for response
+        //if we will get heartbeat then it will change the timestamp, which we 
are 
+        //checking below in case of tcp check failure..
         doCheckMember(mbr, false);
         pinged = doTCPCheckMember(mbr, port);
       }
@@ -1269,8 +1274,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           }
           failed = true;
         } else {
-          logger.info(
-              "Final check failed but detected recent message traffic for 
suspect member " + mbr);
+          logger.info("Final check failed but detected recent message traffic 
for suspect member " + mbr);
         }
       }
       if (!failed) {
@@ -1286,7 +1290,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   @Override
-  public void memberShutdown(DistributedMember mbr, String reason) {}
+  public void memberShutdown(DistributedMember mbr, String reason) {
+  }
 
   @Override
   public int getFailureDetectionPort() {
@@ -1295,28 +1300,25 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
     // the background suspect-collector thread is currently disabled
-    // synchronized (suspectRequests) {
-    // if (suspectRequests.size() > 0) {
-    // for (SuspectRequest sr: suspectRequests) {
-    // if (!requests.contains(sr)) {
-    // requests.add(sr);
-    // }
-    // }
-    // suspectRequests.clear();
-    // }
-    // }
+    //    synchronized (suspectRequests) {
+    //      if (suspectRequests.size() > 0) {
+    //        for (SuspectRequest sr: suspectRequests) {
+    //          if (!requests.contains(sr)) {
+    //            requests.add(sr);
+    //          }
+    //        }
+    //        suspectRequests.clear();
+    //      }
+    //    }
     logger.debug("Sending suspect request for members {}", requests);
     List<InternalDistributedMember> recipients;
     if (currentView.size() > 4) {
       HashSet<InternalDistributedMember> filter = new HashSet<>();
-      for (Enumeration<InternalDistributedMember> e = 
suspectedMemberInView.keys(); e
-          .hasMoreElements();) {
+      for (Enumeration<InternalDistributedMember> e = 
suspectedMemberInView.keys(); e.hasMoreElements(); ) {
         filter.add(e.nextElement());
       }
-      filter.addAll(
-          
requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
-      recipients =
-          currentView.getPreferredCoordinators(filter, 
services.getJoinLeave().getMemberID(), 5);
+      
filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
+      recipients = currentView.getPreferredCoordinators(filter, 
services.getJoinLeave().getMemberID(), 5);
     } else {
       recipients = currentView.getMembers();
     }
@@ -1336,6 +1338,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   private static class ConnectTimeoutTask extends TimerTask implements 
ConnectionWatcher {
+
     final Timer scheduler;
     Socket socket;
     final long timeout;


Reply via email to