Repository: incubator-geode
Updated Branches:
  refs/heads/develop 3046ea831 -> 15450f5c3


GEODE-1874: Checkin after code formatting refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1661504f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1661504f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1661504f

Branch: refs/heads/develop
Commit: 1661504f2944870d59a8c545bba80dfc19a58b94
Parents: b0c6f05
Author: Udo Kohlmeyer <[email protected]>
Authored: Fri Oct 21 16:11:54 2016 -0700
Committer: Udo Kohlmeyer <[email protected]>
Committed: Tue Nov 8 07:09:19 2016 +1100

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 410 ++++++++++---------
 1 file changed, 227 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1661504f/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index b3598cd..ec1d606 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1,22 +1,46 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
  */
 package org.apache.geode.distributed.internal.membership.gms.fd;
 
-import static org.apache.geode.internal.DataSerializableFixedID.*;
+import static 
org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
+import static 
org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
+import static 
org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.GemFireConfigException;
+import org.apache.geode.SystemConnectException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.NetView;
+import org.apache.geode.distributed.internal.membership.gms.GMSMember;
+import org.apache.geode.distributed.internal.membership.gms.Services;
+import 
org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import 
org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
+import 
org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
+import 
org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
+import 
org.apache.geode.distributed.internal.membership.gms.messages.SuspectMembersMessage;
+import 
org.apache.geode.distributed.internal.membership.gms.messages.SuspectRequest;
+import org.apache.geode.internal.ConnectionWatcher;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.logging.log4j.Logger;
+import org.jgroups.util.UUID;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -52,52 +76,24 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.apache.logging.log4j.Logger;
-import org.jgroups.util.UUID;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.GemFireConfigException;
-import org.apache.geode.SystemConnectException;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DMStats;
-import org.apache.geode.distributed.internal.DistributionMessage;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.NetView;
-import org.apache.geode.distributed.internal.membership.gms.GMSMember;
-import org.apache.geode.distributed.internal.membership.gms.Services;
-import 
org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
-import 
org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
-import 
org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
-import 
org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
-import 
org.apache.geode.distributed.internal.membership.gms.messages.SuspectMembersMessage;
-import 
org.apache.geode.distributed.internal.membership.gms.messages.SuspectRequest;
-import org.apache.geode.internal.ConnectionWatcher;
-import org.apache.geode.internal.Version;
-import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.internal.security.SecurableCommunicationChannel;
-
 /**
  * Failure Detection
  * <p>
- * This class make sure that each member is alive and communicating to this 
member.
- * To make sure that we create the ring of members based on current view. On 
this
- * ring, each member make sure that next-member in ring is communicating with 
it.
- * For that we record last message timestamp from next-member. And if it sees 
this
- * member has not communicated in last period(member-timeout) then we check 
whether
- * this member is still alive or not. Based on that we informed probable 
coordinators
- * to remove that member from view.
+ * This class make sure that each member is alive and communicating to this 
member. To make sure
+ * that we create the ring of members based on current view. On this ring, 
each member make sure
+ * that next-member in ring is communicating with it. For that we record last 
message timestamp from
+ * next-member. And if it sees this member has not communicated in last 
period(member-timeout) then
+ * we check whether this member is still alive or not. Based on that we 
informed probable
+ * coordinators to remove that member from view.
  * <p>
- * It has {@link #suspect(InternalDistributedMember, String)} api, which can 
be used
- * to initiate suspect processing for any member. First is checks whether the 
member is
- * responding or not. Then it informs probable coordinators to remove that 
member from
- * view.
+ * It has {@link #suspect(InternalDistributedMember, String)} api, which can 
be used to initiate
+ * suspect processing for any member. First is checks whether the member is 
responding or not. Then
+ * it informs probable coordinators to remove that member from view.
  * <p>
- * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to 
see
- * if that member is alive. Then based on removal flag it initiates the 
suspect processing
- * for that member.
+ * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to 
see if that member is
+ * alive. Then based on removal flag it initiates the suspect processing for 
that member.
  */
-@SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", 
"NullableProblems" })
+@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", 
"NullableProblems"})
 public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   private Services services;
@@ -114,22 +110,24 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   private static final Logger logger = Services.getLogger();
 
   /**
-   * The number of recipients of periodic heartbeats.  The recipients will
-   * be selected from the members that are likely to be monitoring this member.
+   * The number of recipients of periodic heartbeats. The recipients will be 
selected from the
+   * members that are likely to be monitoring this member.
    */
   private static final int NUM_HEARTBEATS = 
Integer.getInteger("geode.heartbeat-recipients", 2);
 
   /**
-   * Member activity will be recorded per interval/period. Timer task will set 
interval's starting time.
-   * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL 
may be configured
+   * Member activity will be recorded per interval/period. Timer task will set 
interval's starting
+   * time. Each interval will be member-timeout/LOGICAL_INTERVAL. 
LOGICAL_INTERVAL may be configured
    * via a system property with a default of 2. At least 1 interval is needed.
    */
-  public static final int LOGICAL_INTERVAL = 
Integer.getInteger("geode.logical-message-received-interval", 2);
+  public static final int LOGICAL_INTERVAL =
+      Integer.getInteger("geode.logical-message-received-interval", 2);
 
   /**
    * stall time to wait for members leaving concurrently
    */
-  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = 
Long.getLong("geode.suspect-member-collection-interval", 200);
+  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL =
+      Long.getLong("geode.suspect-member-collection-interval", 200);
 
   private volatile long currentTimeStamp;
 
@@ -141,17 +139,20 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   /**
    * Timestamp at which we last had contact from a member
    */
-  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = 
new ConcurrentHashMap<>();
+  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
+      new ConcurrentHashMap<>();
 
   /**
    * Members currently being suspected and the view they were suspected in
    */
-  final private ConcurrentHashMap<InternalDistributedMember, NetView> 
suspectedMemberInView = new ConcurrentHashMap<>();
+  final private ConcurrentHashMap<InternalDistributedMember, NetView> 
suspectedMemberInView =
+      new ConcurrentHashMap<>();
 
   /**
    * Members undergoing final checks
    */
-  final private List<InternalDistributedMember> membersInFinalCheck = 
Collections.synchronizedList(new ArrayList<>(30));
+  final private List<InternalDistributedMember> membersInFinalCheck =
+      Collections.synchronizedList(new ArrayList<>(30));
 
   /**
    * Replies to messages
@@ -215,9 +216,9 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /***
-   * This class sets start interval timestamp to record the activity of all 
members.
-   * That is used by {@link 
GMSHealthMonitor#contactedBy(InternalDistributedMember)} to
-   * record the activity of member.
+   * This class sets start interval timestamp to record the activity of all 
members. That is used by
+   * {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to record 
the activity of
+   * member.
    *
    * It initiates the suspect processing for next neighbour if it doesn't see 
any activity from that
    * member in last interval(member-timeout)
@@ -240,7 +241,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       InternalDistributedMember neighbour = nextNeighbor;
 
       long currentTime = System.currentTimeMillis();
-      //this is the start of interval to record member activity
+      // this is the start of interval to record member activity
       GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
       if (neighbour != null) {
@@ -268,8 +269,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
   /***
    * Check thread waits on this object for response. It puts requestId in 
requestIdVsResponse map.
-   * Response will have requestId, which is used to get ResponseObject. Then 
it is used to
-   * notify waiting thread.
+   * Response will have requestId, which is used to get ResponseObject. Then 
it is used to notify
+   * waiting thread.
    */
   private class Response {
 
@@ -312,7 +313,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
         int myVmViewId = gmbr.getVmViewId();
         if (playingDead) {
           logger.debug("HealthMonitor: simulating sick member in health 
check");
-        } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == 
myUUID.getMostSignificantBits() && vmViewId == myVmViewId) {
+        } else if (uuidLSBs == myUUID.getLeastSignificantBits()
+            && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == 
myVmViewId) {
           logger.debug("HealthMonitor: sending OK reply");
           out.write(OK);
           out.flush();
@@ -322,7 +324,11 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           logger.debug("HealthMonitor: server replied OK.");
         } else {
           if (logger.isDebugEnabled()) {
-            logger.debug("HealthMonitor: sending ERROR reply - my UUID is 
{},{} received is {},{}.  My viewID is {} received is {}", 
Long.toHexString(myUUID.getMostSignificantBits()), 
Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), 
Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
+            logger.debug(
+                "HealthMonitor: sending ERROR reply - my UUID is {},{} 
received is {},{}.  My viewID is {} received is {}",
+                Long.toHexString(myUUID.getMostSignificantBits()),
+                Long.toHexString(myUUID.getLeastSignificantBits()), 
Long.toHexString(uuidMSBs),
+                Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
           }
           out.write(ERROR);
           out.flush();
@@ -385,7 +391,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
 
-  private HeartbeatRequestMessage constructHeartbeatRequestMessage(final 
InternalDistributedMember mbr) {
+  private HeartbeatRequestMessage constructHeartbeatRequestMessage(
+      final InternalDistributedMember mbr) {
     final int reqId = requestId.getAndIncrement();
     final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, 
reqId);
     hrm.setRecipient(mbr);
@@ -433,8 +440,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * This method sends heartbeat request to other member and waits for 
member-timeout
-   * time for response. If it doesn't see response then it returns false.
+   * This method sends heartbeat request to other member and waits for 
member-timeout time for
+   * response. If it doesn't see response then it returns false.
    */
   private boolean doCheckMember(InternalDistributedMember member, boolean 
waitForResponse) {
     if (playingDead || beingSick) {
@@ -485,7 +492,9 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
         }
       }
     } catch (InterruptedException e) {
-      logger.debug("GMSHealthMonitor checking thread interrupted, while 
waiting for response from member: {} .", member);
+      logger.debug(
+          "GMSHealthMonitor checking thread interrupted, while waiting for 
response from member: {} .",
+          member);
     } finally {
       if (waitForResponse) {
         requestIdVsResponse.remove(hrm.getRequestId());
@@ -495,19 +504,22 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * During final check, establish TCP connection between current member and 
suspect member.
-   * And exchange PING/PONG message to see if the suspect member is still 
alive.
-   *
+   * During final check, establish TCP connection between current member and 
suspect member. And
+   * exchange PING/PONG message to see if the suspect member is still alive.
    * @param suspectMember member that does not respond to 
HeartbeatRequestMessage
-   *
    * @return true if successfully exchanged PING/PONG with TCP connection, 
otherwise false.
    */
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
     Socket clientSocket = null;
-    InternalDistributedSystem internalDistributedSystem = 
InternalDistributedSystem.getConnectedInstance();
+    InternalDistributedSystem internalDistributedSystem =
+        InternalDistributedSystem.getConnectedInstance();
     try {
-      logger.debug("Checking member {} with TCP socket connection {}:{}.", 
suspectMember, suspectMember.getInetAddress(), port);
-      clientSocket = 
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(),
 port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), 
memberTimeout), false, -1, false);
+      logger.debug("Checking member {} with TCP socket connection {}:{}.", 
suspectMember,
+          suspectMember.getInetAddress(), port);
+      clientSocket =
+          
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
+              .connect(suspectMember.getInetAddress(), port, (int) 
memberTimeout,
+                  new ConnectTimeoutTask(services.getTimer(), memberTimeout), 
false, -1, false);
       clientSocket.setTcpNoDelay(true);
       return doTCPCheckMember(suspectMember, clientSocket);
     } catch (IOException e) {
@@ -530,7 +542,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     return false;
   }
 
-  //Package protected for testing purposes
+  // Package protected for testing purposes
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, Socket 
clientSocket) {
     try {
       if (clientSocket.isConnected()) {
@@ -544,7 +556,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
         logger.debug("Connected to suspect member - reading response");
         int b = in.read();
         if (logger.isDebugEnabled()) {
-          logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" 
: "unknown response: " + b)));
+          logger.debug("Received {}",
+              (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + 
b)));
         }
         if (b >= 0) {
           this.stats.incFinalCheckResponsesReceived();
@@ -557,7 +570,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           }
           return true;
         } else {
-          //received ERROR
+          // received ERROR
           return false;
         }
       } else {// cannot establish TCP connection with suspect member
@@ -584,19 +597,20 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   public void suspect(InternalDistributedMember mbr, String reason) {
     initiateSuspicion(mbr, reason);
     // Background suspect-collecting thread is currently disabled - it takes 
too long
-    //    synchronized (suspectRequests) {
-    //      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) 
mbr, reason);
-    //      if (!suspectRequests.contains(sr)) {
-    //        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-    //        suspectRequests.add(sr);
-    //        suspectRequests.notify();
-    //      }
-    //    }
+    // synchronized (suspectRequests) {
+    // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, 
reason);
+    // if (!suspectRequests.contains(sr)) {
+    // logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
+    // suspectRequests.add(sr);
+    // suspectRequests.notify();
+    // }
+    // }
   }
 
   @Override
   public boolean checkIfAvailable(DistributedMember mbr, String reason, 
boolean initiateRemoval) {
-    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, 
(InternalDistributedMember) mbr, reason);
+    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval,
+        (InternalDistributedMember) mbr, reason);
   }
 
   public void start() {
@@ -612,7 +626,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       @Override
       public Thread newThread(Runnable r) {
         int id = threadIdx.getAndIncrement();
-        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure 
Detection thread " + id);
+        Thread th =
+            new Thread(Services.getThreadGroup(), r, "Geode Failure Detection 
thread " + id);
         th.setDaemon(true);
         return th;
       }
@@ -621,16 +636,17 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     long delay = memberTimeout / LOGICAL_INTERVAL;
     monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, 
TimeUnit.MILLISECONDS);
 
-    //    suspectRequestCollectorThread = this.new 
RequestCollector<SuspectRequest>("Geode Suspect Message Collector", 
Services.getThreadGroup(), suspectRequests,
-    //        new Callback<SuspectRequest>() {
-    //      @Override
-    //      public void process(List<SuspectRequest> requests) {
-    //        GMSHealthMonitor.this.sendSuspectRequest(requests);
+    // suspectRequestCollectorThread = this.new 
RequestCollector<SuspectRequest>("Geode Suspect
+    // Message Collector", Services.getThreadGroup(), suspectRequests,
+    // new Callback<SuspectRequest>() {
+    // @Override
+    // public void process(List<SuspectRequest> requests) {
+    // GMSHealthMonitor.this.sendSuspectRequest(requests);
     //
-    //      }
-    //    }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-    //    suspectRequestCollectorThread.setDaemon(true);
-    //    suspectRequestCollectorThread.start()
+    // }
+    // }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+    // suspectRequestCollectorThread.setDaemon(true);
+    // suspectRequestCollectorThread.start()
 
     serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
       final AtomicInteger threadIdx = new AtomicInteger();
@@ -638,7 +654,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       @Override
       public Thread newThread(Runnable r) {
         int id = threadIdx.getAndIncrement();
-        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure 
Detection Server thread " + id);
+        Thread th =
+            new Thread(Services.getThreadGroup(), r, "Geode Failure Detection 
Server thread " + id);
         th.setDaemon(true);
         return th;
       }
@@ -649,33 +666,44 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
     ServerSocket serverSocket;
     try {
-      serverSocket = 
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress,
 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, 
portRange, false);
+      serverSocket = SocketCreatorFactory
+          .getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
+          .createServerSocketUsingPortRange(socketAddress, 50/* backlog */, 
true/* isBindAddress */,
+              false/* useNIO */, 65536/* tcpBufferSize */, portRange, false);
       socketPort = serverSocket.getLocalPort();
     } catch (IOException | SystemConnectException e) {
-      throw new GemFireConfigException("Unable to allocate a failure detection 
port in the membership-port range", e);
+      throw new GemFireConfigException(
+          "Unable to allocate a failure detection port in the membership-port 
range", e);
     }
     return serverSocket;
   }
 
   /**
-   * start the thread that listens for tcp/ip connections and responds
-   * to connection attempts
+   * start the thread that listens for tcp/ip connections and responds to 
connection attempts
    */
   private void startTcpServer(ServerSocket ssocket) {
     // allocate a socket here so there are no race conditions between knowing 
the FD
     // socket port and joining the system
 
     serverSocketExecutor.execute(() -> {
-      logger.info("Started failure detection server thread on {}:{}.", 
ssocket.getInetAddress(), socketPort);
+      logger.info("Started failure detection server thread on {}:{}.", 
ssocket.getInetAddress(),
+          socketPort);
       Socket socket = null;
       try {
-        while (!services.getCancelCriterion().isCancelInProgress() && 
!GMSHealthMonitor.this.isStopping) {
+        while (!services.getCancelCriterion().isCancelInProgress()
+            && !GMSHealthMonitor.this.isStopping) {
           try {
             socket = ssocket.accept();
             if (GMSHealthMonitor.this.playingDead) {
               continue;
             }
-            serverSocketExecutor.execute(new ClientSocketHandler(socket)); 
//start();  [bruce] I'm seeing a lot of failures due to this thread not being 
created fast enough, sometimes as long as 30 seconds
+            serverSocketExecutor.execute(new ClientSocketHandler(socket)); // 
start(); [bruce] I'm
+            // seeing a lot of
+            // failures due to this
+            // thread not being
+            // created fast enough,
+            // sometimes as long as
+            // 30 seconds
 
           } catch (RejectedExecutionException e) {
             // this can happen during shutdown
@@ -710,8 +738,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * start the thread that periodically sends a message to processes
-   * that might be watching this process
+   * start the thread that periodically sends a message to processes that 
might be watching this
+   * process
    */
   private void startHeartbeatThread() {
     checkExecutor.execute(new Runnable() {
@@ -795,22 +823,24 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     synchronized (viewVsSuspectedMembers) {
       viewVsSuspectedMembers.clear();
     }
-    for (Iterator<InternalDistributedMember> it = 
memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
+    for (Iterator<InternalDistributedMember> it = 
memberTimeStamps.keySet().iterator(); it
+        .hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    for (Iterator<InternalDistributedMember> it = 
suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
+    for (Iterator<InternalDistributedMember> it = 
suspectedMemberInView.keySet().iterator(); it
+        .hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    //    for (InternalDistributedMember mbr: newView.getMembers()) {
-    //      if (!memberVsLastMsgTS.containsKey(mbr)) {
-    //        CustomTimeStamp customTS = new 
CustomTimeStamp(System.currentTimeMillis());
-    //        memberVsLastMsgTS.put(mbr, customTS);
-    //      }
-    //    }
+    // for (InternalDistributedMember mbr: newView.getMembers()) {
+    // if (!memberVsLastMsgTS.containsKey(mbr)) {
+    // CustomTimeStamp customTS = new 
CustomTimeStamp(System.currentTimeMillis());
+    // memberVsLastMsgTS.put(mbr, customTS);
+    // }
+    // }
     currentView = newView;
     setNextNeighbor(newView, null);
   }
@@ -818,11 +848,10 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   /***
    * This method sets next neighbour which it needs to watch in current view.
    *
-   * if nextTo == null
-   * then it watches member next to it.
+   * if nextTo == null then it watches member next to it.
    *
-   * It becomes null when we suspect current neighbour, during that time it 
watches
-   * member next to suspect member.
+   * It becomes null when we suspect current neighbour, during that time it 
watches member next to
+   * suspect member.
    */
   private synchronized void setNextNeighbor(NetView newView, 
InternalDistributedMember nextTo) {
     if (newView == null) {
@@ -834,14 +863,14 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
     List<InternalDistributedMember> allMembers = newView.getMembers();
     //
-    //    Set<InternalDistributedMember> checkAllSuspected = new 
HashSet<>(allMembers);
-    //    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
-    //    checkAllSuspected.remove(localAddress);
-    //    if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
-    //      logger.info("All other members are suspect at this point");
-    //      nextNeighbor = null;
-    //      return;
-    //    }
+    // Set<InternalDistributedMember> checkAllSuspected = new 
HashSet<>(allMembers);
+    // checkAllSuspected.removeAll(suspectedMemberInView.keySet());
+    // checkAllSuspected.remove(localAddress);
+    // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
+    // logger.info("All other members are suspect at this point");
+    // nextNeighbor = null;
+    // return;
+    // }
 
     if (allMembers.size() > 1 && suspectedMemberInView.size() >= 
allMembers.size() - 1) {
       boolean nonSuspectFound = false;
@@ -901,7 +930,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   @Override
   public void started() {
     setLocalAddress(services.getMessenger().getMemberID());
-    serverSocket = createServerSocket(localAddress.getInetAddress(), 
services.getConfig().getMembershipPortRange());
+    serverSocket = createServerSocket(localAddress.getInetAddress(),
+        services.getConfig().getMembershipPortRange());
     startTcpServer(serverSocket);
     startHeartbeatThread();
   }
@@ -948,19 +978,21 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      logger.info("GMSHealthMonitor serverSocketExecutor is " + 
(serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
+      logger.info("GMSHealthMonitor serverSocketExecutor is "
+          + (serverSocketExecutor.isTerminated() ? "terminated" : "not 
terminated"));
     }
 
-    //    if (suspectRequestCollectorThread != null) {
-    //      suspectRequestCollectorThread.shutdown();
-    //    }
+    // if (suspectRequestCollectorThread != null) {
+    // suspectRequestCollectorThread.shutdown();
+    // }
   }
 
   /***
    * test method
    */
   public boolean isShutdown() {
-    return scheduler.isShutdown() && checkExecutor.isShutdown() && 
serverSocketExecutor.isShutdown() /*&& 
!suspectRequestCollectorThread.isAlive()*/;
+    return scheduler.isShutdown() && checkExecutor.isShutdown()
+        && serverSocketExecutor.isShutdown() /* && 
!suspectRequestCollectorThread.isAlive() */;
   }
 
   /**
@@ -976,7 +1008,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   @Override
-  public void memberSuspected(InternalDistributedMember initiator, 
InternalDistributedMember suspect, String reason) {
+  public void memberSuspected(InternalDistributedMember initiator,
+                              InternalDistributedMember suspect, String 
reason) {
   }
 
   @Override
@@ -1067,7 +1100,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     this.stats.incHeartbeatsReceived();
     if (m.getRequestId() >= 0) {
       Response resp = requestIdVsResponse.get(m.getRequestId());
-      logger.trace("Got heartbeat from member {}. {}", m.getSender(), (resp != 
null ? "Check thread still waiting" : "Check thread is not waiting"));
+      logger.trace("Got heartbeat from member {}. {}", m.getSender(),
+          (resp != null ? "Check thread still waiting" : "Check thread is not 
waiting"));
       if (resp != null) {
         synchronized (resp) {
           resp.setResponseMsg(m);
@@ -1076,15 +1110,14 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       }
 
     }
-    //we got heartbeat lets update timestamp
+    // we got heartbeat lets update timestamp
     contactedBy(m.getSender(), System.currentTimeMillis());
   }
 
   /**
-   * Process a Suspect request from another member. This may cause this member
-   * to become the new membership coordinator.
-   * it will to final check on that member and then it will send remove request
-   * for that member
+   * Process a Suspect request from another member. This may cause this member 
to become the new
+   * membership coordinator. it will to final check on that member and then it 
will send remove
+   * request for that member
    */
   private void processSuspectMembersRequest(SuspectMembersMessage 
incomingRequest) {
 
@@ -1101,8 +1134,10 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     InternalDistributedMember sender = incomingRequest.getSender();
     int viewId = sender.getVmViewId();
     if (cv.getViewId() >= viewId && !cv.contains(incomingRequest.getSender())) 
{
-      logger.info("Membership ignoring suspect request for " + incomingRequest 
+ " from non-member " + incomingRequest.getSender());
-      services.getJoinLeave().remove(sender, "this process is initiating 
suspect processing but is no longer a member");
+      logger.info("Membership ignoring suspect request for " + incomingRequest 
+ " from non-member "
+          + incomingRequest.getSender());
+      services.getJoinLeave().remove(sender,
+          "this process is initiating suspect processing but is no longer a 
member");
       return;
     }
 
@@ -1124,13 +1159,13 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       }
     }
 
-
     if (cv.getCoordinator().equals(localAddress)) {
       for (SuspectRequest req : incomingRequest.getMembers()) {
-        logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(), req.getReason());
+        logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(),
+            req.getReason());
       }
       checkIfAvailable(sender, sMembers, cv);
-    }// coordinator ends
+    } // coordinator ends
     else {
 
       NetView check = new NetView(cv, cv.getViewId() + 1);
@@ -1148,7 +1183,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       if (coordinator != null && coordinator.equals(localAddress)) {
         // new coordinator
         for (SuspectRequest req : incomingRequest.getMembers()) {
-          logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(), req.getReason());
+          logger.info("received suspect message from {} for {}: {}", sender, 
req.getSuspectMember(),
+              req.getReason());
         }
         checkIfAvailable(sender, smbr, cv);
       } else {
@@ -1159,8 +1195,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /***
-   * This method make sure that records suspectRequest. We need to make sure 
this
-   * on preferred coordinators, as elder coordinator might be in suspected 
list next. 
+   * This method make sure that records suspectRequest. We need to make sure 
this on preferred
+   * coordinators, as elder coordinator might be in suspected list next.
    */
   private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView 
cv) {
     // record suspect requests
@@ -1178,12 +1214,12 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
   }
 
   /**
-   * performs a "final" health check on the member.  If failure-detection
-   * socket information is available for the member (in the view) then
-   * we attempt to connect to its socket and ask if it's the expected member.
-   * Otherwise we send a heartbeat request and wait for a reply.
+   * performs a "final" health check on the member. If failure-detection 
socket information is
+   * available for the member (in the view) then we attempt to connect to its 
socket and ask if it's
+   * the expected member. Otherwise we send a heartbeat request and wait for a 
reply.
    */
-  private void checkIfAvailable(final InternalDistributedMember initiator, 
List<SuspectRequest> sMembers, final NetView cv) {
+  private void checkIfAvailable(final InternalDistributedMember initiator,
+                                List<SuspectRequest> sMembers, final NetView 
cv) {
 
     for (final SuspectRequest sr : sMembers) {
       final InternalDistributedMember mbr = sr.getSuspectMember();
@@ -1198,13 +1234,13 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
       // suspectMemberInView is now set by the heartbeat monitoring code
       // to allow us to move on from watching members we've already
-      // suspected.  Since that code is updating this collection we
+      // suspected. Since that code is updating this collection we
       // cannot use it here as an indication that a member is currently
       // undergoing a final check.
-      //      NetView view;
-      //      view = suspectedMemberInView.putIfAbsent(mbr, cv);
+      // NetView view;
+      // view = suspectedMemberInView.putIfAbsent(mbr, cv);
 
-      //      if (view == null || !view.equals(cv)) {
+      // if (view == null || !view.equals(cv)) {
       final String reason = sr.getReason();
       logger.debug("Scheduling final check for member {}; reason={}", mbr, 
reason);
       // its a coordinator
@@ -1219,11 +1255,14 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
         }
       });
-      //      }// scheduling for final check and removing it..
+      // }// scheduling for final check and removing it..
     }
   }
 
-  private boolean inlineCheckIfAvailable(final InternalDistributedMember 
initiator, final NetView cv, boolean initiateRemoval, final 
InternalDistributedMember mbr, final String reason) {
+  private boolean inlineCheckIfAvailable(final InternalDistributedMember 
initiator,
+                                         final NetView cv, boolean 
initiateRemoval,
+                                         final InternalDistributedMember mbr,
+                                         final String reason) {
 
     if (services.getJoinLeave().isMemberLeaving(mbr)) {
       return false;
@@ -1245,7 +1284,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
       if (port <= 0) {
         logger.info("Unable to locate failure detection port - requesting a 
heartbeat");
         if (logger.isDebugEnabled()) {
-          logger.debug("\ncurrent view: {}\nports: {}", cv, 
Arrays.toString(cv.getFailureDetectionPorts()));
+          logger.debug("\ncurrent view: {}\nports: {}", cv,
+              Arrays.toString(cv.getFailureDetectionPorts()));
         }
         pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
@@ -1255,9 +1295,9 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
         }
       } else {
-        //this will just send heartbeat request, it will not wait for response
-        //if we will get heartbeat then it will change the timestamp, which we 
are 
-        //checking below in case of tcp check failure..
+        // this will just send heartbeat request, it will not wait for response
+        // if we will get heartbeat then it will change the timestamp, which 
we are
+        // checking below in case of tcp check failure..
         doCheckMember(mbr, false);
         pinged = doTCPCheckMember(mbr, port);
       }
@@ -1271,7 +1311,8 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
           }
           failed = true;
         } else {
-          logger.info("Final check failed but detected recent message traffic 
for suspect member " + mbr);
+          logger.info(
+              "Final check failed but detected recent message traffic for 
suspect member " + mbr);
         }
       }
       if (!failed) {
@@ -1297,25 +1338,28 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
     // the background suspect-collector thread is currently disabled
-    //    synchronized (suspectRequests) {
-    //      if (suspectRequests.size() > 0) {
-    //        for (SuspectRequest sr: suspectRequests) {
-    //          if (!requests.contains(sr)) {
-    //            requests.add(sr);
-    //          }
-    //        }
-    //        suspectRequests.clear();
-    //      }
-    //    }
+    // synchronized (suspectRequests) {
+    // if (suspectRequests.size() > 0) {
+    // for (SuspectRequest sr: suspectRequests) {
+    // if (!requests.contains(sr)) {
+    // requests.add(sr);
+    // }
+    // }
+    // suspectRequests.clear();
+    // }
+    // }
     logger.debug("Sending suspect request for members {}", requests);
     List<InternalDistributedMember> recipients;
     if (currentView.size() > 4) {
       HashSet<InternalDistributedMember> filter = new HashSet<>();
-      for (Enumeration<InternalDistributedMember> e = 
suspectedMemberInView.keys(); e.hasMoreElements(); ) {
+      for (Enumeration<InternalDistributedMember> e = 
suspectedMemberInView.keys(); e
+          .hasMoreElements(); ) {
         filter.add(e.nextElement());
       }
-      
filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
-      recipients = currentView.getPreferredCoordinators(filter, 
services.getJoinLeave().getMemberID(), 5);
+      filter.addAll(
+          
requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
+      recipients =
+          currentView.getPreferredCoordinators(filter, 
services.getJoinLeave().getMemberID(), 5);
     } else {
       recipients = currentView.getMembers();
     }


Reply via email to