[GEODE-77] TCP check for final check in health monitor
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/63802dab Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/63802dab Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/63802dab Branch: refs/heads/feature/GEODE-77 Commit: 63802dab6090eef2721620afe518e1ad0b65df5d Parents: 5feab82 Author: Jianxia Chen <[email protected]> Authored: Fri Oct 23 13:14:59 2015 -0700 Committer: Jianxia Chen <[email protected]> Committed: Fri Oct 23 13:32:38 2015 -0700 ---------------------------------------------------------------------- .../membership/gms/fd/GMSHealthMonitor.java | 240 ++++++- .../gms/interfaces/HealthMonitor.java | 18 + .../membership/gms/membership/GMSJoinLeave.java | 698 +++++++++---------- .../gms/messages/InstallViewMessage.java | 48 +- .../gms/messages/JoinRequestMessage.java | 14 +- .../gms/messages/JoinResponseMessage.java | 48 +- .../membership/GMSHealthMonitorJUnitTest.java | 14 +- .../sanctionedDataSerializables.txt | 12 +- 8 files changed, 725 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/63802dab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 1ca206f..774ab37 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -4,6 +4,15 @@ import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_REQUES import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_RESPONSE; import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -24,11 +33,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.Logger; +import com.gemstone.gemfire.SystemConnectException; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.NetView; +import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler; @@ -36,6 +47,8 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckRe import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckResponseMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet; /** @@ -118,6 +131,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { /** test hook */ boolean beingSick = false; + + // For TCP check + private ExecutorService serverSocketExecutor; + private static final int OK = 0x01; + private static final int ERROR = 0x02; + private InetAddress ip; + private volatile int socketPort; + private volatile ServerSocket serverSocket; + private Map<InternalDistributedMember, InetSocketAddress> socketInfo = new ConcurrentHashMap<InternalDistributedMember, InetSocketAddress>(); public GMSHealthMonitor() { @@ -329,6 +351,64 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { return false; } + /** + * During final check, establish TCP connection between current member and suspect member. + * And exchange PING/PONG message to see if the suspect member is still alive. + * + * @param suspectMember member that does not respond to CheckRequestMessage + * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false. + */ + private boolean doTCPCheckMember(InternalDistributedMember suspectMember, InetSocketAddress addr) { + logger.trace("Checking member {} with TCP socket connection.", suspectMember); + Socket clientSocket = new Socket(); + try { + // establish TCP connection + for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) { + logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort()); + } + logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, addr.getAddress(), addr.getPort()); + clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout()); + if (clientSocket.isConnected()) { + clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout()); + InputStream in = clientSocket.getInputStream(); + DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream()); + logger.info("TCP check: suspect member uuid: " + ((GMSMember) suspectMember.getNetMember()).getUUID()); + out.writeShort(Version.CURRENT_ORDINAL); + out.writeLong(((GMSMember) suspectMember.getNetMember()).getUuidLSBs()); + out.writeLong(((GMSMember) suspectMember.getNetMember()).getUuidMSBs()); + out.flush(); + clientSocket.shutdownOutput(); + logger.debug("Send suspect member uuid to member {} with TCP socket connection.", suspectMember); + int b = in.read(); + logger.debug("Received {} from member {} with TCP socket connection.", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember); + if (b == OK) { + CustomTimeStamp ts = memberVsLastMsgTS.get(suspectMember); + if (ts != null) { + ts.setTimeStamp(System.currentTimeMillis()); + } + return true; + } else { + //received ERROR + return false; + } + } else {// cannot establish TCP connection with suspect member + return false; + } + } catch (IOException e) { + logger.trace("Unexpected exception", e); + } finally { + try { + if (clientSocket != null) { + clientSocket.close(); + } + } catch (IOException e) { + logger.trace("Unexpected exception", e); + } + } + + return false; + } + /* * (non-Javadoc) * @@ -357,7 +437,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } public void start() { - { + { scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -398,6 +478,123 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { suspectRequestCollectorThread.setDaemon(true); suspectRequestCollectorThread.start(); } + + { + serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() { + AtomicInteger threadIdx = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + int id = threadIdx.getAndIncrement(); + Thread th = new Thread(Services.getThreadGroup(), r, "TCP Check ServerSocket Thread " + id); + th.setDaemon(true); + return th; + } + }); + + serverSocketExecutor.execute(new Runnable() { + @Override + public void run() { + Socket socket = null; + try { + // start server socket for TCP check + if (serverSocket == null) { + localAddress = services.getMessenger().getMemberID(); + ip = localAddress.getInetAddress(); + int[] portRange = services.getConfig().getMembershipPortRange(); + socketPort = AvailablePort.getAvailablePortInRange(portRange[0], portRange[1], AvailablePort.SOCKET); + if (socketPort == -1) { + throw new SystemConnectException("Unable to find a free port in the membership port range"); + } + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress(ip, socketPort)); + logger.info("GMSHealthMonitor started server socket on {}:{}.", ip, socketPort); + socketInfo.put(localAddress, new InetSocketAddress(ip, socketPort)); + while (!services.getCancelCriterion().isCancelInProgress() + && !GMSHealthMonitor.this.isStopping) { + try { + socket = serverSocket.accept(); + if (GMSHealthMonitor.this.playingDead) { + continue; + } + socket.setSoTimeout((int) services.getConfig().getMemberTimeout()); + new ClientSocketHandler(socket).start(); + } catch (IOException e) { + logger.trace("Unexpected exception", e); + try { + if (socket != null) { + socket.close(); + } + } catch (IOException ioe) { + logger.trace("Unexpected exception", ioe); + } + } + } + logger.info("GMSHealthMonitor server socket has done its jobs."); + } + } catch (IOException e) { + logger.trace("Unexpected exception", e); + } finally { + // close the server socket + if (serverSocket != null && !serverSocket.isClosed()) { + try { + serverSocket.close(); + serverSocket = null; + logger.info("GMSHealthMonitor server socket closed."); + } catch (IOException e) { + logger.debug("Unexpected exception", e); + } + } + } + } + }); + } + } + + class ClientSocketHandler extends Thread { + + private Socket socket; + + public ClientSocketHandler(Socket socket) { + super(services.getThreadGroup(), "ClientSocketHandler"); + this.socket = socket; + setDaemon(true); + } + + public void run() { + try { + DataInputStream in = new DataInputStream(socket.getInputStream()); + OutputStream out = socket.getOutputStream(); + short version = in.readShort(); + long uuidLSBs = in.readLong(); + long uuidMSBs = in.readLong(); + logger.debug("GMSHealthMonitor server socket received {} and {}.", uuidMSBs, uuidLSBs); + logger.debug("GMSHealthMonitor member uuid is {}", ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUUID()); + if (uuidLSBs == ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUuidLSBs() + && uuidMSBs == ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUuidMSBs()) { + out.write(OK); + out.flush(); + socket.shutdownOutput(); + logger.debug("GMSHealthMonitor server socket replied OK."); + } + else { + out.write(ERROR); + out.flush(); + socket.shutdownOutput(); + logger.debug("GMSHealthMonitor server socket replied ERROR."); + } + } catch (IOException e) { + logger.trace("Unexpected exception", e); + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + logger.info("Unexpected exception", e); + } + } + } + } } public synchronized void installView(NetView newView) { @@ -521,6 +718,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { checkExecutor.shutdown(); } + if (serverSocketExecutor != null) { + if (serverSocket != null && !serverSocket.isClosed()) { + try { + serverSocket.close(); + serverSocket = null; + logger.info("GMSHealthMonitor server socket is closed in stopServices()."); + } + catch (IOException e) { + logger.trace("Unexpected exception", e); + } + } + serverSocketExecutor.shutdownNow(); + logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated")); + } + if (suspectRequestCollectorThread != null) { suspectRequestCollectorThread.shutdown(); } @@ -530,7 +742,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * test method */ public boolean isShutdown() { - return scheduler.isShutdown() && checkExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive(); + return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive(); } @Override @@ -752,7 +964,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { memberVsLastMsgTS.put(mbr, ts); logger.info("Performing final check for suspect member {} reason={}", mbr, reason); - boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr); + boolean pinged; + InetSocketAddress addr = socketInfo.get(mbr); + if (addr == null || addr.getPort() < 0) { + pinged = GMSHealthMonitor.this.doCheckMember(mbr); + } else { + pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, addr); + } logger.info("Final check {}", pinged? "succeeded" : "failed"); if (!pinged && !isStopping) { @@ -919,4 +1137,20 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // TODO Auto-generated method stub } + + public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo() { + return this.socketInfo; + } + + public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers) { + logger.debug("installSocketInfo members=" + members + " portsForMembers=" + portsForMembers); + for (int i = 0; i < members.size(); i++) { + if (portsForMembers.get(i).intValue() == -1) { + continue; + } + InetSocketAddress addr = new InetSocketAddress(members.get(i).getInetAddress(), portsForMembers.get(i).intValue()); + socketInfo.put(members.get(i), addr); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/63802dab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java index 9ace2be..628e416 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java @@ -1,7 +1,12 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.NetMember; public interface HealthMonitor extends Service { @@ -34,4 +39,17 @@ public interface HealthMonitor extends Service { * ShutdownMessage has been received from the given member */ public void memberShutdown(DistributedMember mbr, String reason); + + /** + * Returns a map that describes the members and their server sockets + */ + public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo(); + + /** + * Update the information of the members and their server sockets + * + * @param members + * @param portsForMembers List of socket ports for each member + */ + public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/63802dab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 5a792eb..6d39a6a 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -19,8 +19,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,8 +30,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; @@ -72,26 +72,25 @@ import com.gemstone.gemfire.security.AuthenticationFailedException; public class GMSJoinLeave implements JoinLeave, MessageHandler { public static String BYPASS_DISCOVERY = "gemfire.bypass-discovery"; - + /** amount of time to wait for responses to FindCoordinatorRequests */ private static final int DISCOVERY_TIMEOUT = Integer.getInteger("gemfire.discovery-timeout", 3000); /** amount of time to sleep before trying to join after a failed attempt */ private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000); - + /** stall time to wait for concurrent join/leave/remove requests to be received */ public static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 500); /** time to wait for a leave request to be transmitted by jgroups */ private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 1000); - + /** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */ private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30); - + /** membership logger */ private static final Logger logger = Services.getLogger(); - /** the view ID where I entered into membership */ private int birthViewId; @@ -99,65 +98,65 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private InternalDistributedMember localAddress; private Services services; - + /** have I connected to the distributed system? */ private volatile boolean isJoined; /** guarded by viewInstallationLock */ private boolean isCoordinator; - + /** a synch object that guards view installation */ private final Object viewInstallationLock = new Object(); /** the currently installed view. Guarded by viewInstallationLock */ private volatile NetView currentView; - + /** the previous view **/ private volatile NetView previousView; - + private final Set<InternalDistributedMember> removedMembers = new HashSet<>(); - + /** a new view being installed */ private NetView preparedView; - + /** the last view that conflicted with view preparation */ private NetView lastConflictingView; - + private List<InetSocketAddress> locators; - + /** a list of join/leave/crashes */ private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>(); /** collects the response to a join request */ private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1]; - + /** collects responses to new views */ private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false); - + /** collects responses to view preparation messages */ private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true); /** whether quorum checks can cause a forced-disconnect */ private boolean quorumRequired = false; - + /** timeout in receiving view acknowledgement */ private int viewAckTimeout; /** background thread that creates new membership views */ private ViewCreator viewCreator; - + /** am I shutting down? */ private volatile boolean isStopping; /** state of collected artifacts during discovery */ final SearchState searchState = new SearchState(); - + /** a collection used to detect unit testing */ Set<String> unitTesting = new HashSet<>(); - + /** the view where quorum was most recently lost */ NetView quorumLostView; - + static class SearchState { Set<InternalDistributedMember> alreadyTried = new HashSet<>(); Set<InternalDistributedMember> registrants = new HashSet<>(); @@ -167,12 +166,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { boolean hasContactedAJoinedLocator; NetView view; Set<FindCoordinatorResponse> responses = new HashSet<>(); - + void cleanup() { alreadyTried.clear(); possibleCoordinator = null; view = null; - synchronized(responses) { + synchronized (responses) { responses.clear(); } } @@ -191,7 +190,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * @return true if successful, false if not */ public boolean join() { - + try { if (Boolean.getBoolean(BYPASS_DISCOVERY)) { synchronized(viewInstallationLock) { @@ -199,7 +198,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } return true; } - + SearchState state = searchState; long locatorWaitTime = services.getConfig().getLocatorWaitTime() * 1000; @@ -209,7 +208,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { long startTime = System.currentTimeMillis(); long locatorGiveUpTime = startTime + locatorWaitTime; long giveupTime = startTime + timeout; - + for (int tries=0; !this.isJoined; tries++) { logger.debug("searching for the membership coordinator"); boolean found = findCoordinator(); @@ -256,18 +255,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return false; } } // for - + if (!this.isJoined) { logger.debug("giving up attempting to join the distributed system after " + (System.currentTimeMillis() - startTime) + "ms"); } - + // to preserve old behavior we need to throw a SystemConnectException if // unable to contact any of the locators if (!this.isJoined && state.hasContactedAJoinedLocator) { throw new SystemConnectException("Unable to join the distributed system in " + (System.currentTimeMillis()-startTime) + "ms"); } - + return this.isJoined; } finally { // notify anyone waiting on the address to be completed @@ -282,29 +281,31 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * send a join request and wait for a reply. Process the reply. * This may throw a SystemConnectException or an AuthenticationFailedException + * * @param coord * @return true if the attempt succeeded, false if it timed out */ private boolean attemptToJoin() { SearchState state = searchState; - + // send a join request to the coordinator and wait for a response InternalDistributedMember coord = state.possibleCoordinator; logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress); - JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, - services.getAuthenticator().getCredentials(coord)); - + JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord)); + // add server socket port in the join request + if (services.getHealthMonitor().getSocketInfo().get(localAddress) != null) { + req.setSocketPort(services.getHealthMonitor().getSocketInfo().get(localAddress).getPort()); + } services.getMessenger().send(req); - + JoinResponseMessage response = null; - synchronized(joinResponse) { + synchronized (joinResponse) { if (joinResponse[0] == null) { try { // Note that if we give up waiting but a response is on // the way we will get the new view and join that way. // See installView() - long timeout = Math.max(services.getConfig().getMemberTimeout(), - services.getConfig().getJoinTimeout()/5); + long timeout = Math.max(services.getConfig().getMemberTimeout(), services.getConfig().getJoinTimeout() / 5); joinResponse.wait(timeout); } catch (InterruptedException e) { logger.debug("join attempt was interrupted"); @@ -319,7 +320,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { joinResponse[0] = null; String failReason = response.getRejectionMessage(); if (failReason != null) { - if (failReason.contains("Rejecting the attempt of a member using an older version") + if (failReason.contains("Rejecting the attempt of a member using an older version") || failReason.contains("15806")) { throw new SystemConnectException(failReason); } @@ -335,8 +336,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } else { this.birthViewId = response.getMemberID().getVmViewId(); this.localAddress.setVmViewId(this.birthViewId); - GMSMember me = (GMSMember)this.localAddress.getNetMember(); + GMSMember me = (GMSMember) this.localAddress.getNetMember(); me.setBirthViewId(birthViewId); + services.getHealthMonitor().installSocketInfo(response.getCurrentView().getMembers(), response.getPortsForMembers()); installView(response.getCurrentView()); } @@ -350,13 +352,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } return false; } - - + /** - * process a join request from another member. If this is the coordinator + * process a join request from another member. If this is the coordinator * this method will enqueue the request for processing in another thread. * If this is not the coordinator but the coordinator is known, the message * is forwarded to the coordinator. + * * @param incomingRequest */ private void processJoinRequest(JoinRequestMessage incomingRequest) { @@ -364,8 +366,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.info("received join request from {}", incomingRequest.getMemberID()); if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) { - logger.warn("detected an attempt to start a peer using an older version of the product {}", - incomingRequest.getMemberID()); + logger.warn("detected an attempt to start a peer using an older version of the product {}", incomingRequest.getMemberID()); JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version"); m.setRecipient(incomingRequest.getMemberID()); services.getMessenger().send(m); @@ -379,26 +380,37 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { rejection = e.getMessage(); e.printStackTrace(); } - if (rejection != null && rejection.length() > 0) { + if (rejection != null && rejection.length() > 0) { JoinResponseMessage m = new JoinResponseMessage(rejection); m.setRecipient(incomingRequest.getMemberID()); services.getMessenger().send(m); return; } - - if (!this.localAddress.getNetMember().preferredForCoordinator() && + + if (!this.localAddress.getNetMember().preferredForCoordinator() && incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) { JoinResponseMessage m = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true); + // add socket ports of all members to join response + List<Integer> portsForMembers = new ArrayList<Integer>(currentView.size()); + Map<InternalDistributedMember, InetSocketAddress> socketInfo = services.getHealthMonitor().getSocketInfo(); + for (InternalDistributedMember mbr : currentView.getMembers()) { + InetSocketAddress addr = socketInfo.get(mbr); + if (addr != null) { + portsForMembers.add(Integer.valueOf(addr.getPort())); + } else { + portsForMembers.add(Integer.valueOf(-1)); + } + } + m.setPortsForMembers(portsForMembers); services.getMessenger().send(m); return; } recordViewRequest(incomingRequest); } - - + /** - * Process a Leave request from another member. This may cause this member - * to become the new membership coordinator. If this is the coordinator + * Process a Leave request from another member. This may cause this member + * to become the new membership coordinator. If this is the coordinator * a new view will be triggered. * * @param incomingRequest @@ -406,31 +418,30 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void processLeaveRequest(LeaveRequestMessage incomingRequest) { logger.info("received leave request from {} for {}", incomingRequest.getSender(), incomingRequest.getMemberID()); - - + NetView v = currentView; InternalDistributedMember mbr = incomingRequest.getMemberID(); - + if (logger.isDebugEnabled()) { - logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping - +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress()); + logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping + +"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress()); } if (!v.contains(mbr) && mbr.getVmViewId() < v.getViewId()) { logger.debug("ignoring leave request from old member"); return; } - + if (incomingRequest.getMemberID().equals(this.localAddress)) { logger.info("I am being told to leave the distributed system"); forceDisconnect(incomingRequest.getReason()); } - + if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) { logger.debug("JoinLeave is checking to see if I should become coordinator"); - NetView check = new NetView(v, v.getViewId()+1); + NetView check = new NetView(v, v.getViewId() + 1); check.remove(incomingRequest.getMemberID()); - synchronized(removedMembers) { + synchronized (removedMembers) { check.removeAll(removedMembers); check.addCrashedMembers(removedMembers); } @@ -439,8 +450,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { becomeCoordinator(incomingRequest.getMemberID()); } } - } - else { + } else { if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) { recordViewRequest(incomingRequest); this.viewProcessor.processLeaveRequest(incomingRequest.getMemberID()); @@ -448,11 +458,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - - + /** - * Process a Remove request from another member. This may cause this member - * to become the new membership coordinator. If this is the coordinator + * Process a Remove request from another member. This may cause this member + * to become the new membership coordinator. If this is the coordinator * a new view will be triggered. * * @param incomingRequest @@ -462,13 +471,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { InternalDistributedMember mbr = incomingRequest.getMemberID(); - if (v != null && !v.contains(incomingRequest.getSender())) { + if (v != null && !v.contains(incomingRequest.getSender())) { logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender()); return; } - + logger.info("Membership received a request to remove " + mbr - + " from " + incomingRequest.getSender() + + " from " + incomingRequest.getSender() + " reason="+incomingRequest.getReason()); if (mbr.equals(this.localAddress)) { @@ -476,16 +485,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { forceDisconnect(incomingRequest.getReason()); return; } - + if (getPendingRequestIDs(REMOVE_MEMBER_REQUEST).contains(mbr)) { logger.debug("ignoring request as I already have a removal request for this member"); return; } - + if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) { logger.debug("JoinLeave is checking to see if I should become coordinator"); - NetView check = new NetView(v, v.getViewId()+1); - synchronized(removedMembers) { + NetView check = new NetView(v, v.getViewId() + 1); + synchronized (removedMembers) { removedMembers.add(mbr); check = new NetView(v, v.getViewId()); check.addCrashedMembers(removedMembers); @@ -496,8 +505,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { becomeCoordinator(mbr); } } - } - else { + } else { if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) { recordViewRequest(incomingRequest); this.viewProcessor.processRemoveRequest(mbr); @@ -505,42 +513,41 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - - + private void recordViewRequest(DistributionMessage request) { logger.debug("JoinLeave is recording the request to be processed in the next membership view"); - synchronized(viewRequests) { + synchronized (viewRequests) { viewRequests.add(request); viewRequests.notify(); } } - + // for testing purposes, returns a copy of the view requests for verification List<DistributionMessage> getViewRequests() { - synchronized(viewRequests) { + synchronized (viewRequests) { return new LinkedList<DistributionMessage>(viewRequests); } } - + // for testing purposes, returns the view-creation thread ViewCreator getViewCreator() { return viewCreator; } - + /** * Yippeee - I get to be the coordinator */ void becomeCoordinator() { // package access for unit testing becomeCoordinator(null); } - - + public void becomeCoordinatorForTest() { synchronized(viewInstallationLock) { becomeCoordinator(); } } + /** * Transitions this member into the coordinator role. This must * be invoked under a synch on viewInstallationLock that was held @@ -581,7 +588,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (this.localAddress.getVmViewId() < 0) { this.localAddress.setVmViewId(viewNumber); } - List<InternalDistributedMember> mbrs = new ArrayList<>(currentView.getMembers()); if (!mbrs.contains(localAddress)) { mbrs.add(localAddress); @@ -605,40 +611,75 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - - private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) { - for (InternalDistributedMember mbr: newMbrs) { + + private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView, List<Integer> portsForMembers) { + for (InternalDistributedMember mbr : newMbrs) { JoinResponseMessage response = new JoinResponseMessage(mbr, newView); + response.setPortsForMembers(portsForMembers); services.getMessenger().send(response); } } - - private void sendRemoveMessages(List<InternalDistributedMember> removals, - List<String> reasons, NetView newView) { + + private void sendRemoveMessages(List<InternalDistributedMember> removals, List<String> reasons, NetView newView) { Iterator<String> reason = reasons.iterator(); - for (InternalDistributedMember mbr: removals) { + for (InternalDistributedMember mbr : removals) { RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next()); services.getMessenger().send(response); } } - - - boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) { - return sendView(view, newMembers, true, this.prepareProcessor); + + boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers, List<DistributionMessage> requests) { + return sendView(view, newMembers, true, this.prepareProcessor, requests); } - - void sendView(NetView view, Collection<InternalDistributedMember> newMembers) { - sendView(view, newMembers, false, this.viewProcessor); + + void sendView(NetView view, Collection<InternalDistributedMember> newMembers, List<DistributionMessage> requests) { + sendView(view, newMembers, false, this.viewProcessor, requests); + } + + /** + * Build a list of socket ports for messages, e.g. InstallViewMessage, JoinResponseMessage + * @param view + * @param requests + * @return + */ + private void addPorts(NetView view, List<DistributionMessage> requests, List<Integer> portsForMembers) { + Map<InternalDistributedMember, InetSocketAddress> socketInfo = services.getHealthMonitor().getSocketInfo(); + Map<InternalDistributedMember, Integer> portMap = new ConcurrentHashMap<InternalDistributedMember, Integer>(); + for (DistributionMessage req : requests) { + if (req.getDSFID() == JOIN_REQUEST) { + JoinRequestMessage joinReq = (JoinRequestMessage) req; + portMap.put(joinReq.getMemberID(), Integer.valueOf(joinReq.getSocketPort())); + } + } + for (InternalDistributedMember mbr : view.getMembers()) { + InetSocketAddress addr = socketInfo.get(mbr); + if (addr != null) { + portsForMembers.add(Integer.valueOf(addr.getPort())); + } else { + Integer port = portMap.get(mbr); + if (port != null) { + portsForMembers.add(port); + } else { + portsForMembers.add(Integer.valueOf(-1)); + } + } + } } - - boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) { + boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp, + List<DistributionMessage> requests) { int id = view.getViewId(); InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing); Set<InternalDistributedMember> recips = new HashSet<>(view.getMembers()); + // add socket ports of all members to InstallViewMessage + List<Integer> portsForMembers = new ArrayList<Integer>(view.size()); + if (requests != null) { + addPorts(view, requests, portsForMembers); + msg.setPortsForMembers(portsForMembers); + } // a recent member was seen not to receive a new view - I think this is why -// recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage + // recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage recips.remove(this.localAddress); // no need to send it to ourselves Set<InternalDistributedMember> responders = recips; @@ -649,19 +690,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (preparing) { this.preparedView = view; + if (requests != null) { + services.getHealthMonitor().installSocketInfo(view.getMembers(), portsForMembers); + } } else { installView(view); } - + if (recips.isEmpty()) { logger.info("no recipients for new view aside from myself"); return true; } - - logger.info((preparing? "preparing" : "sending") + " new view " + view); + + logger.info((preparing ? "preparing" : "sending") + " new view " + view); msg.setRecipients(recips); - + Set<InternalDistributedMember> pendingLeaves = getPendingRequestIDs(LEAVE_REQUEST_MESSAGE); Set<InternalDistributedMember> pendingRemovals = getPendingRequestIDs(REMOVE_MEMBER_REQUEST); pendingRemovals.removeAll(view.getCrashedMembers()); @@ -672,78 +716,73 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // only wait for responses during preparation if (preparing) { logger.debug("waiting for view responses"); - + Set<InternalDistributedMember> failedToRespond = rp.waitForResponses(); logger.info("finished waiting for responses to view preparation"); - + InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender(); NetView conflictingView = rp.getConflictingView(); if (conflictingView != null) { - logger.warn("received a conflicting membership view from " + conflictingViewSender + logger.warn("received a conflicting membership view from " + conflictingViewSender + " during preparation: " + conflictingView); return false; } - - if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) { + + if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) { logger.warn("these members failed to respond to the view change: " + failedToRespond); return false; } } - + return true; } - - private void processViewMessage(InstallViewMessage m) { - + logger.debug("Membership: processing {}", m); - + NetView view = m.getView(); - - if (currentView != null && view.getViewId() < currentView.getViewId()) { + + if (currentView != null && view.getViewId() < currentView.getViewId()) { // ignore old views ackView(m); return; } - - + if (m.isPreparing()) { if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) { services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView)); - } - else { + } else { this.preparedView = view; + if (!m.getPortsForMembers().isEmpty()) { + services.getHealthMonitor().installSocketInfo(view.getMembers(), m.getPortsForMembers()); + } ackView(m); } - } - else { // !preparing - if (currentView != null && !view.contains(this.localAddress)) { + } else { // !preparing + if (currentView != null && !view.contains(this.localAddress)) { if (quorumRequired) { forceDisconnect("This node is no longer in the membership view"); } - } - else { + } else { ackView(m); installView(view); } } } - + private void forceDisconnect(String reason) { this.isStopping = true; services.getManager().forceDisconnect(reason); } - private void ackView(InstallViewMessage m) { if (m.getView().contains(m.getView().getCreator())) { services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing())); } } - - + private void processViewAckMessage(ViewAckMessage m) { if (m.isPrepareAck()) { this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView()); @@ -751,15 +790,15 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView()); } } - + /** * This contacts the locators to find out who the current coordinator is. - * All locators are contacted. If they don't agree then we choose the oldest + * All locators are contacted. If they don't agree then we choose the oldest * coordinator and return it. */ private boolean findCoordinator() { SearchState state = searchState; - + assert this.localAddress != null; // If we've already tried to bootstrap from locators that @@ -769,7 +808,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if ( !state.hasContactedAJoinedLocator && state.view != null) { return findCoordinatorFromView(); } - + FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId); Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>(); @@ -778,14 +817,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { int connectTimeout = (int)services.getConfig().getMemberTimeout(); boolean anyResponses = false; boolean flagsSet = false; - + logger.debug("sending {} to {}", request, locators); state.hasContactedAJoinedLocator = false; state.locatorsContacted = 0; do { - for (InetSocketAddress addr: locators) { + for (InetSocketAddress addr : locators) { try { Object o = TcpClient.requestToServer( addr.getAddress(), addr.getPort(), request, connectTimeout, @@ -846,7 +885,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } InternalDistributedMember coord = null; boolean coordIsNoob = true; - for (; it.hasNext(); ) { + for (; it.hasNext();) { InternalDistributedMember mbr = it.next(); if (!state.alreadyTried.contains(mbr)) { boolean mbrIsNoob = (mbr.getVmViewId() < 0); @@ -866,7 +905,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } return true; } - + boolean findCoordinatorFromView() { ArrayList<FindCoordinatorResponse> result; SearchState state = searchState; @@ -883,8 +922,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId); req.setRecipients(v.getMembers()); - boolean testing = unitTesting.contains("findCoordinatorFromView"); - synchronized(state.responses) { + boolean testing = unitTesting.contains("findCoordinatorFromView"); + synchronized (state.responses) { if (!testing) { state.responses.clear(); } @@ -900,7 +939,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { result = new ArrayList<>(state.responses); state.responses.clear(); } - + InternalDistributedMember coord = null; if (localAddress.getNetMember().preferredForCoordinator()) { // it's possible that all other potential coordinators are gone @@ -908,13 +947,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { coord = localAddress; } boolean coordIsNoob = true; - for (FindCoordinatorResponse resp: result) { + for (FindCoordinatorResponse resp : result) { InternalDistributedMember mbr = resp.getCoordinator(); if (!state.alreadyTried.contains(mbr)) { boolean mbrIsNoob = (mbr.getVmViewId() < 0); if (mbrIsNoob) { // member has not yet joined - if (coordIsNoob && (coord == null || coord.compareTo(mbr,false) > 0)) { + if (coordIsNoob && (coord == null || coord.compareTo(mbr, false) > 0)) { coord = mbr; } } else { @@ -926,11 +965,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - + state.possibleCoordinator = coord; return coord != null; } - + /** * Some settings are gleaned from locator responses and set into the local * configuration @@ -938,11 +977,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void inheritSettingsFromLocator(InetSocketAddress addr, FindCoordinatorResponse response) { boolean enabled = response.isNetworkPartitionDetectionEnabled(); if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) { - throw new GemFireConfigException("locator at "+addr + throw new GemFireConfigException("locator at "+addr +" does not have network-partition-detection enabled but my configuration has it enabled"); } - GMSMember mbr = (GMSMember)this.localAddress.getNetMember(); + GMSMember mbr = (GMSMember) this.localAddress.getNetMember(); mbr.setSplitBrainEnabled(enabled); services.getConfig().setNetworkPartitionDetectionEnabled(enabled); services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(enabled); @@ -950,28 +989,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (response.isUsePreferredCoordinators()) { this.quorumRequired = true; logger.debug("The locator indicates that all locators should be preferred as coordinators"); - if (services.getLocator() != null - || Locator.hasLocator() + if (services.getLocator() != null + || Locator.hasLocator() || !services.getConfig().getDistributionConfig().getStartLocator().isEmpty() || localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { - ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true); + ((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true); } } else { - ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true); + ((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true); } } - + /** * receives a JoinResponse holding a membership view or rejection message + * * @param rsp */ private void processJoinResponse(JoinResponseMessage rsp) { - synchronized(joinResponse) { + synchronized (joinResponse) { joinResponse[0] = rsp; joinResponse.notify(); } } - + private void processFindCoordinatorRequest(FindCoordinatorRequest req) { FindCoordinatorResponse resp; if (this.isJoined) { @@ -983,16 +1023,15 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { resp.setRecipient(req.getMemberID()); services.getMessenger().send(resp); } - + private void processFindCoordinatorResponse(FindCoordinatorResponse resp) { - synchronized(searchState.responses) { + synchronized (searchState.responses) { searchState.responses.add(resp); } } - + private void processNetworkPartitionMessage(NetworkPartitionMessage msg) { - String str = "Membership coordinator " - + msg.getSender() + " has declared that a network partition has occurred"; + String str = "Membership coordinator " + msg.getSender() + " has declared that a network partition has occurred"; forceDisconnect(str); } @@ -1000,7 +1039,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { public NetView getView() { return currentView; } - + public NetView getPreviousView() { return previousView; } @@ -1009,35 +1048,34 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { public InternalDistributedMember getMemberID() { return this.localAddress; } - + public void installView(NetView newView) { - + logger.info("received new view: {}\nold view is: {}", newView, currentView); - - synchronized(viewInstallationLock) { + + synchronized (viewInstallationLock) { if (currentView != null && currentView.getViewId() >= newView.getViewId()) { // old view - ignore it return; } - + if (currentView == null && !this.isJoined) { - for (InternalDistributedMember mbr: newView.getMembers()) { + for (InternalDistributedMember mbr : newView.getMembers()) { if (this.localAddress.equals(mbr)) { this.birthViewId = mbr.getVmViewId(); this.localAddress.setVmViewId(this.birthViewId); - GMSMember me = (GMSMember)this.localAddress.getNetMember(); + GMSMember me = (GMSMember) this.localAddress.getNetMember(); me.setBirthViewId(birthViewId); isJoined = true; break; } } } - + if (isNetworkPartition(newView)) { if (quorumRequired) { Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView); - forceDisconnect( - LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes)); + forceDisconnect(LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes)); return; } } @@ -1046,7 +1084,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { preparedView = null; lastConflictingView = null; services.installView(newView); - + if (!newView.getCreator().equals(this.localAddress)) { if (newView.shouldBeCoordinator(this.localAddress)) { becomeCoordinator(); @@ -1057,20 +1095,20 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } if (!this.isCoordinator) { - // get rid of outdated requests. It's possible some requests are + // get rid of outdated requests. It's possible some requests are // newer than the view just processed - the senders will have to // resend these - synchronized(viewRequests) { - for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) { + synchronized (viewRequests) { + for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext();) { DistributionMessage m = it.next(); if (m instanceof JoinRequestMessage) { it.remove(); } else if (m instanceof LeaveRequestMessage) { - if (!currentView.contains(((LeaveRequestMessage)m).getMemberID())) { + if (!currentView.contains(((LeaveRequestMessage) m).getMemberID())) { it.remove(); } } else if (m instanceof RemoveMemberMessage) { - if (!currentView.contains(((RemoveMemberMessage)m).getMemberID())) { + if (!currentView.contains(((RemoveMemberMessage) m).getMemberID())) { it.remove(); } } @@ -1078,11 +1116,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - synchronized(removedMembers) { + synchronized (removedMembers) { removedMembers.clear(); } } - + /** * Sends a message declaring a network partition to the * members of the given view via Messenger @@ -1099,8 +1137,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("unable to send network partition message - continuing", e); } } - - + /** * returns true if this member thinks it is the membership coordinator * for the distributed system @@ -1108,21 +1145,20 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { public boolean isCoordinator() { return this.isCoordinator; } - + /** * return true if we're stopping or are stopped */ public boolean isStopping() { return this.isStopping; } - + /** * returns the currently prepared view, if any */ public NetView getPreparedView() { return this.preparedView; } - /** * check to see if the new view shows a drop of 51% or more @@ -1134,32 +1170,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { int oldWeight = currentView.memberWeight(); int failedWeight = newView.getCrashedMemberWeight(currentView); if (failedWeight > 0) { - if (logger.isInfoEnabled() - && !newView.getCreator().equals(localAddress)) { // view-creator logs this + if (logger.isInfoEnabled() && !newView.getCreator().equals(localAddress)) { // view-creator logs this newView.logCrashedMemberWeights(currentView, logger); } - int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0); + int failurePoint = (int) (Math.round(51 * oldWeight) / 100.0); if (failedWeight > failurePoint && quorumLostView != newView) { quorumLostView = newView; - logger.warn("total weight lost in this view change is {} of {}. Quorum has been lost!", - failedWeight, oldWeight); + logger.warn("total weight lost in this view change is {} of {}. Quorum has been lost!", failedWeight, oldWeight); services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView); return true; } } return false; } - - + private void stopCoordinatorServices() { if (viewCreator != null && !viewCreator.isShutdown()) { viewCreator.shutdown(); } } - + public static void loadEmergencyClasses() { } - + @Override public void emergencyClose() { isStopping = true; @@ -1177,29 +1210,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { public void beHealthy() { } - - @Override public void start() { } - - @Override public void started() { this.localAddress = services.getMessenger().getMemberID(); } - - @Override public void stop() { logger.debug("JoinLeave stopping"); leave(); } - - @Override public void stopped() { } @@ -1210,12 +1235,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { viewProcessor.memberSuspected(initiator, suspect); } - - @Override public void leave() { boolean waitForProcessing = false; - synchronized(viewInstallationLock) { + synchronized (viewInstallationLock) { NetView view = currentView; isStopping = true; stopCoordinatorServices(); @@ -1223,15 +1246,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (view.size() > 1) { if (this.isCoordinator) { logger.debug("JoinLeave stopping coordination services"); - NetView newView = new NetView(view, view.getViewId()+1); + NetView newView = new NetView(view, view.getViewId() + 1); newView.remove(localAddress); InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials(this.localAddress)); m.setRecipients(newView.getMembers()); services.getMessenger().send(m); waitForProcessing = true; - } - else { - List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5); + } else { + List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 5); logger.debug("JoinLeave sending a leave request to {}", coords); LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down"); @@ -1239,38 +1261,32 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { waitForProcessing = true; } } // view.size - }// view != null + } // view != null } if (waitForProcessing) { try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } - - @Override public void remove(InternalDistributedMember m, String reason) { NetView v = this.currentView; - + services.getCancelCriterion().checkCancelInProgress(null); - + if (v != null && v.contains(m)) { Set<InternalDistributedMember> filter = new HashSet<>(); filter.add(m); - RemoveMemberMessage msg = new RemoveMemberMessage(v.getPreferredCoordinators(filter, getMemberID(), 5), - m, - reason); + RemoveMemberMessage msg = new RemoveMemberMessage(v.getPreferredCoordinators(filter, getMemberID(), 5), m, reason); msg.setSender(this.localAddress); processRemoveRequest(msg); if (!this.isCoordinator) { msg.resetRecipients(); - msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), - localAddress, 10)); + msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 10)); services.getMessenger().send(msg); } } @@ -1278,32 +1294,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { @Override public void memberShutdown(DistributedMember mbr, String reason) { - + if (this.isCoordinator) { - LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember)mbr, reason); + LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember) mbr, reason); recordViewRequest(msg); } } - @Override public void disableDisconnectOnQuorumLossForTesting() { this.quorumRequired = false; } - + @Override public void init(Services s) { this.services = s; - + DistributionConfig dc = services.getConfig().getDistributionConfig(); - if (dc.getMcastPort() != 0 - && dc.getLocators().trim().isEmpty() - && dc.getStartLocator().trim().isEmpty()) { - throw new GemFireConfigException("Multicast cannot be configured for a non-distributed cache." - + " Please configure the locator services for this cache using "+DistributionConfig.LOCATORS_NAME - + " or " + DistributionConfig.START_LOCATOR_NAME+"."); + if (dc.getMcastPort() != 0 && dc.getLocators().trim().isEmpty() && dc.getStartLocator().trim().isEmpty()) { + throw new GemFireConfigException( + "Multicast cannot be configured for a non-distributed cache." + " Please configure the locator services for this cache using " + + DistributionConfig.LOCATORS_NAME + " or " + DistributionConfig.START_LOCATOR_NAME + "."); } - + services.getMessenger().addHandler(JoinRequestMessage.class, this); services.getMessenger().addHandler(JoinResponseMessage.class, this); services.getMessenger().addHandler(InstallViewMessage.class, this); @@ -1324,9 +1337,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue(); this.viewAckTimeout = ackCollectionTimeout; - + this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection(); - + DistributionConfig dconfig = services.getConfig().getDistributionConfig(); String bindAddr = dconfig.getBindAddress(); locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr); @@ -1340,37 +1353,36 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("JoinLeave processing {}", m); switch (m.getDSFID()) { case JOIN_REQUEST: - processJoinRequest((JoinRequestMessage)m); + processJoinRequest((JoinRequestMessage) m); break; case JOIN_RESPONSE: - processJoinResponse((JoinResponseMessage)m); + processJoinResponse((JoinResponseMessage) m); break; case INSTALL_VIEW_MESSAGE: - processViewMessage((InstallViewMessage)m); + processViewMessage((InstallViewMessage) m); break; case VIEW_ACK_MESSAGE: - processViewAckMessage((ViewAckMessage)m); + processViewAckMessage((ViewAckMessage) m); break; case LEAVE_REQUEST_MESSAGE: - processLeaveRequest((LeaveRequestMessage)m); + processLeaveRequest((LeaveRequestMessage) m); break; case REMOVE_MEMBER_REQUEST: - processRemoveRequest((RemoveMemberMessage)m); + processRemoveRequest((RemoveMemberMessage) m); break; case FIND_COORDINATOR_REQ: - processFindCoordinatorRequest((FindCoordinatorRequest)m); + processFindCoordinatorRequest((FindCoordinatorRequest) m); break; case FIND_COORDINATOR_RESP: - processFindCoordinatorResponse((FindCoordinatorResponse)m); + processFindCoordinatorResponse((FindCoordinatorResponse) m); break; case NETWORK_PARTITION_MESSAGE: - processNetworkPartitionMessage((NetworkPartitionMessage)m); + processNetworkPartitionMessage((NetworkPartitionMessage) m); break; default: throw new IllegalArgumentException("unknown message type: " + m); } } - /** * returns the member IDs of the pending requests having the given @@ -1378,17 +1390,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { */ Set<InternalDistributedMember> getPendingRequestIDs(int theDSFID) { Set<InternalDistributedMember> result = new HashSet<>(); - synchronized(viewRequests) { - for (DistributionMessage msg: viewRequests) { + synchronized (viewRequests) { + for (DistributionMessage msg : viewRequests) { if (msg.getDSFID() == theDSFID) { - result.add(((HasMemberID)msg).getMemberID()); + result.add(((HasMemberID) msg).getMemberID()); } } } return result; } - - + class ViewReplyProcessor { volatile int viewId = -1; final Set<InternalDistributedMember> notRepliedYet = new HashSet<>(); @@ -1397,11 +1408,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { boolean waiting; final boolean isPrepareViewProcessor; final Set<InternalDistributedMember> pendingRemovals = new HashSet<>(); - + ViewReplyProcessor(boolean forPreparation) { this.isPrepareViewProcessor = forPreparation; } - + synchronized void initialize(int viewId, Set<InternalDistributedMember> recips) { waiting = true; this.viewId = viewId; @@ -1410,26 +1421,24 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { conflictingView = null; pendingRemovals.clear(); } - - synchronized void processPendingRequests(Set<InternalDistributedMember> pendingLeaves, - Set<InternalDistributedMember> pendingRemovals) { + + synchronized void processPendingRequests(Set<InternalDistributedMember> pendingLeaves, Set<InternalDistributedMember> pendingRemovals) { // there's no point in waiting for members who have already // requested to leave or who have been declared crashed. // We don't want to mix the two because pending removals // aren't reflected as having crashed in the current view // and need to cause a new view to be generated - for (InternalDistributedMember mbr: pendingLeaves) { + for (InternalDistributedMember mbr : pendingLeaves) { notRepliedYet.remove(mbr); } - for (InternalDistributedMember mbr: pendingRemovals) { + for (InternalDistributedMember mbr : pendingRemovals) { if (this.notRepliedYet.contains(mbr)) { this.pendingRemovals.add(mbr); } } } - - synchronized void memberSuspected(InternalDistributedMember initiator, - InternalDistributedMember suspect) { + + synchronized void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { if (waiting) { // we will do a final check on this member if it hasn't already // been done, so stop waiting for it now @@ -1440,14 +1449,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - + synchronized void processLeaveRequest(InternalDistributedMember mbr) { if (waiting) { logger.debug("view response processor recording leave request for {}", mbr); stopWaitingFor(mbr); } } - + synchronized void processRemoveRequest(InternalDistributedMember mbr) { if (waiting) { logger.debug("view response processor recording remove request for {}", mbr); @@ -1455,12 +1464,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { checkIfDone(); } } - + synchronized void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) { if (!waiting) { return; } - + if (viewId == this.viewId) { if (conflictingView != null) { this.conflictingViewSender = sender; @@ -1477,11 +1486,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { notRepliedYet.remove(mbr); checkIfDone(); } - + /** call with synchronized(this) */ private void checkIfDone() { - if (notRepliedYet.isEmpty() || - (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) { + if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) { logger.debug("All anticipated view responses received - notifying waiting thread"); waiting = false; notify(); @@ -1489,15 +1497,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("Still waiting for these view replies: {}", notRepliedYet); } } - + Set<InternalDistributedMember> waitForResponses() { Set<InternalDistributedMember> result = this.notRepliedYet; long endOfWait = System.currentTimeMillis() + viewAckTimeout; try { - while (System.currentTimeMillis() < endOfWait - && (services.getCancelCriterion().cancelInProgress() == null)) { + while (System.currentTimeMillis() < endOfWait && (services.getCancelCriterion().cancelInProgress() == null)) { try { - synchronized(this) { + synchronized (this) { if (!waiting || result.isEmpty() || this.conflictingView != null) { break; } @@ -1522,56 +1529,53 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } return result; } - + NetView getConflictingView() { return this.conflictingView; } - + InternalDistributedMember getConflictingViewSender() { return this.conflictingViewSender; } - + Set<InternalDistributedMember> getUnresponsiveMembers() { return this.notRepliedYet; } } - - - - class ViewCreator extends Thread { boolean shutdown = false; volatile boolean waiting = false; - + NetView initialView; Set<InternalDistributedMember> initialLeaving; Set<InternalDistributedMember> initialRemovals; - + ViewCreator(String name, ThreadGroup tg) { super(tg, name); } - + void shutdown() { shutdown = true; - synchronized(viewRequests) { + synchronized (viewRequests) { viewRequests.notify(); interrupt(); } } - + boolean isShutdown() { return shutdown; } - + boolean isWaiting() { return waiting; } - + /** * All views should be sent by the ViewCreator thread, so * if this member becomes coordinator it may have an initial * view to transmit that announces the removal of the former coordinator to + * * @param newView * @param leaving - members leaving in this view * @param removals - members crashed in this view @@ -1581,12 +1585,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { this.initialLeaving = leaving; this.initialRemovals = removals; } - + private void sendInitialView() { if (initialView != null) { try { - prepareAndSendView(initialView, Collections.<InternalDistributedMember>emptyList(), - initialLeaving, initialRemovals); + prepareAndSendView(initialView, Collections.<InternalDistributedMember> emptyList(), initialLeaving, initialRemovals, null); } finally { this.initialView = null; this.initialLeaving = null; @@ -1603,7 +1606,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL; try { for (;;) { - synchronized(viewRequests) { + synchronized (viewRequests) { if (shutdown) { return; } @@ -1645,7 +1648,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } // synchronized - if (requests != null && !requests.isEmpty()) { + if (requests != null && !requests.isEmpty()) { logger.info("View Creator is processing {} requests for the next membership view", requests.size()); try { createAndSendView(requests); @@ -1659,7 +1662,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { shutdown = true; } } - + /** * Create a new membership view and send it to members (including crashed members). * Returns false if the view cannot be prepared successfully, true otherwise @@ -1678,17 +1681,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { oldMembers = Collections.emptyList(); } Set<InternalDistributedMember> oldIDs = new HashSet<>(); - - for (DistributionMessage msg: requests) { + + for (DistributionMessage msg : requests) { logger.debug("processing request {}", msg); InternalDistributedMember mbr = null; switch (msg.getDSFID()) { case JOIN_REQUEST: - mbr = ((JoinRequestMessage)msg).getMemberID(); - // see if an old member ID is being reused. If + mbr = ((JoinRequestMessage) msg).getMemberID(); + // see if an old member ID is being reused. If // so we'll remove it from the new view - for (InternalDistributedMember m: oldMembers) { + for (InternalDistributedMember m : oldMembers) { if (mbr.compareTo(m, false) == 0) { oldIDs.add(m); break; @@ -1710,84 +1713,82 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { removalReqs.add(mbr); removalReasons.add(((RemoveMemberMessage) msg).getReason()); } else { - sendRemoveMessages(Collections.<InternalDistributedMember>singletonList(mbr), - Collections.<String>singletonList(((RemoveMemberMessage)msg).getReason()), - currentView); + sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr), + Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView); } break; - default: + default: logger.warn("Unknown membership request encountered: {}", msg); break; } } - - for (InternalDistributedMember mbr: oldIDs) { + + for (InternalDistributedMember mbr : oldIDs) { if (!leaveReqs.contains(mbr) && !removalReqs.contains(mbr)) { removalReqs.add(mbr); removalReasons.add("Removal of old ID that has been reused"); } } - + if (removalReqs.isEmpty() && leaveReqs.isEmpty() && joinReqs.isEmpty()) { return; } - + NetView newView; - synchronized(viewInstallationLock) { + synchronized (viewInstallationLock) { int viewNumber = 0; List<InternalDistributedMember> mbrs; if (currentView == null) { mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size()); } else { - viewNumber = currentView.getViewId()+1; + viewNumber = currentView.getViewId() + 1; mbrs = new ArrayList<InternalDistributedMember>(oldMembers); } mbrs.addAll(joinReqs); mbrs.removeAll(leaveReqs); mbrs.removeAll(removalReqs); - newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, - new HashSet<InternalDistributedMember>(removalReqs)); + newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs)); } - + // if there are no membership changes then abort creation of // the new view if (newView.getMembers().equals(currentView.getMembers())) { logger.info("membership hasn't changed - aborting new view {}", newView); return; } - - for (InternalDistributedMember mbr: joinReqs) { + + for (InternalDistributedMember mbr : joinReqs) { mbr.setVmViewId(newView.getViewId()); mbr.getNetMember().setSplitBrainEnabled(services.getConfig().isNetworkPartitionDetectionEnabled()); } // send removal messages before installing the view so we stop // getting messages from members that have been kicked out sendRemoveMessages(removalReqs, removalReasons, newView); - + // we want to always check for quorum loss but don't act on it // unless network-partition-detection is enabled - if ( !(isNetworkPartition(newView) && quorumRequired) ) { - sendJoinResponses(joinReqs, newView); + if (!(isNetworkPartition(newView) && quorumRequired)) { + // add socket ports of all members to join response + List<Integer> portsForMembers = new ArrayList<Integer>(newView.size()); + addPorts(newView, requests, portsForMembers); + sendJoinResponses(joinReqs, newView, portsForMembers); } - prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers()); + prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers(), requests); return; } - - + /** * This handles the 2-phase installation of the view */ - void prepareAndSendView(NetView newView, - List<InternalDistributedMember> joinReqs, - Set<InternalDistributedMember> leaveReqs, - Set<InternalDistributedMember> removalReqs) { + void prepareAndSendView(NetView newView, List<InternalDistributedMember> joinReqs, Set<InternalDistributedMember> leaveReqs, + Set<InternalDistributedMember> removalReqs, List<DistributionMessage> requests) { boolean prepared = false; do { if (this.shutdown || Thread.currentThread().isInterrupted()) { return; } - + if (quorumRequired && isNetworkPartition(newView)) { sendNetworkPartitionMessage(newView); try { @@ -1798,13 +1799,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return; } Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView); - forceDisconnect( - LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes)); + forceDisconnect(LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes)); shutdown = true; return; } - prepared = prepareView(newView, joinReqs); + prepared = prepareView(newView, joinReqs, requests); logger.debug("view preparation phase completed. prepared={}", prepared); if (prepared) { @@ -1827,12 +1827,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { List<InternalDistributedMember> failures = new ArrayList<>(currentView.getCrashedMembers().size() + unresponsive.size()); NetView conflictingView = prepareProcessor.getConflictingView(); - if (conflictingView != null - && !conflictingView.getCreator().equals(localAddress) - && conflictingView.getViewId() > newView.getViewId() + if (conflictingView != null && !conflictingView.getCreator().equals(localAddress) && conflictingView.getViewId() > newView.getViewId() && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) { lastConflictingView = conflictingView; - logger.info("adding these crashed members from a conflicting view to the crash-set for the next view: {}\nconflicting view: {}", unresponsive, conflictingView); + logger.info("adding these crashed members from a conflicting view to the crash-set for the next view: {}\nconflicting view: {}", unresponsive, + conflictingView); failures.addAll(conflictingView.getCrashedMembers()); } @@ -1849,43 +1848,43 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { removalReqs.addAll(failures); List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers()); newMembers.removeAll(removalReqs); - newView = new NetView(localAddress, newView.getViewId()+1, newMembers, leaveReqs, - removalReqs); + newView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs); } } while (!prepared); - + lastConflictingView = null; - - sendView(newView, joinReqs); + + sendView(newView, joinReqs, requests); } - + /** * performs health checks on the collection of members, removing any that * are found to be healthy + * * @param mbrs */ private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException { - List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size()); - + List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size()); + Set<InternalDistributedMember> newRemovals = new HashSet<>(); Set<InternalDistributedMember> newLeaves = new HashSet<>(); - - synchronized(viewRequests) { - for (DistributionMessage msg: viewRequests) { + + synchronized (viewRequests) { + for (DistributionMessage msg : viewRequests) { switch (msg.getDSFID()) { case LEAVE_REQUEST_MESSAGE: - newLeaves.add(((LeaveRequestMessage)msg).getMemberID()); + newLeaves.add(((LeaveRequestMessage) msg).getMemberID()); break; case REMOVE_MEMBER_REQUEST: - newRemovals.add(((RemoveMemberMessage)msg).getMemberID()); + newRemovals.add(((RemoveMemberMessage) msg).getMemberID()); break; default: break; } } } - - for (InternalDistributedMember mbr: mbrs) { + + for (InternalDistributedMember mbr : mbrs) { if (newRemovals.contains(mbr)) { // no need to do a health check on a member who is already leaving logger.info("member {} is already scheduled for removal", mbr); @@ -1911,22 +1910,23 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } }); } - + mbrs.removeAll(newLeaves); - + if (mbrs.isEmpty()) { return; } - + ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() { AtomicInteger i = new AtomicInteger(); + @Override public Thread newThread(Runnable r) { return new Thread(Services.getThreadGroup(), r, "GemFire View Creator verification thread " + i.incrementAndGet()); } }); - + try { List<Future<InternalDistributedMember>> futures; futures = svc.invokeAll(checkers); @@ -1957,5 +1957,5 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - + }
