Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-77 cfba7ae5d -> 5feab8241
GEODE-77: avoids creating a split-brain if joining times out and network partition detection is not enabled Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5feab824 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5feab824 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5feab824 Branch: refs/heads/feature/GEODE-77 Commit: 5feab8241deb507c11152236b152b91d983eeace Parents: cfba7ae Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Fri Oct 23 13:05:46 2015 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Fri Oct 23 13:06:11 2015 -0700 ---------------------------------------------------------------------- .../membership/gms/fd/GMSHealthMonitor.java | 4 - .../gms/locator/FindCoordinatorResponse.java | 6 ++ .../membership/gms/membership/GMSJoinLeave.java | 95 ++++++++++++-------- .../membership/gms/messenger/Transport.java | 15 ++++ .../gemfire/internal/tcp/Connection.java | 15 +++- .../gms/membership/GMSJoinLeaveJUnitTest.java | 2 +- 6 files changed, 92 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index ed9f214..1ca206f 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -266,10 +266,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } private void sendSuspectMessage(InternalDistributedMember mbr, String reason) { - if (beingSick || playingDead) { - logger.debug("sick member is not sending suspect message concerning {}", mbr); - return; - } logger.info("Sending suspect request {} reason=\"{}\"", mbr, reason); SuspectRequest sr = new SuspectRequest(mbr, reason); List<SuspectRequest> sl = new ArrayList<SuspectRequest>(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java index 5f9576b..0d2ba68 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java @@ -65,6 +65,12 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage return coordinator; } + /** + * When the response comes from a locator via TcpClient this + * will return the locators member ID. If the locator hasn't + * yet joined this may be null. + * @return + */ public InternalDistributedMember getSenderId() { return senderId; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 57611e6..5a792eb 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -163,7 +163,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { Set<InternalDistributedMember> registrants = new HashSet<>(); InternalDistributedMember possibleCoordinator; int viewId = -1; - boolean hasContactedALocator; + int locatorsContacted = 0; + boolean hasContactedAJoinedLocator; NetView view; Set<FindCoordinatorResponse> responses = new HashSet<>(); @@ -201,10 +202,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { SearchState state = searchState; + long locatorWaitTime = services.getConfig().getLocatorWaitTime() * 1000; long timeout = services.getConfig().getJoinTimeout(); logger.debug("join timeout is set to {}", timeout); long retrySleep = JOIN_RETRY_SLEEP; long startTime = System.currentTimeMillis(); + long locatorGiveUpTime = startTime + locatorWaitTime; long giveupTime = startTime + timeout; for (int tries=0; !this.isJoined; tries++) { @@ -232,7 +235,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } else { - if (System.currentTimeMillis() > giveupTime) { + long now = System.currentTimeMillis(); + if (state.locatorsContacted <= 0) { + if (now > locatorGiveUpTime) { + // break out of the loop and return false + break; + } + // reset the tries count and timer since we haven't actually tried to join yet + tries = 0; + giveupTime = now + timeout; + } else if (System.currentTimeMillis() > giveupTime) { break; } } @@ -251,7 +263,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // to preserve old behavior we need to throw a SystemConnectException if // unable to contact any of the locators - if (!this.isJoined && state.hasContactedALocator) { + if (!this.isJoined && state.hasContactedAJoinedLocator) { throw new SystemConnectException("Unable to join the distributed system in " + (System.currentTimeMillis()-startTime) + "ms"); } @@ -750,24 +762,27 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { assert this.localAddress != null; - // TODO - should we try more than one preferred coordinator - // before jumping to asking view-members who the coordinator is? - if ( !state.alreadyTried.isEmpty() && state.view != null) { + // If we've already tried to bootstrap from locators that + // haven't joined the system (e.g., a collocated locator) + // then jump to using the membership view to try to find + // the coordinator + if ( !state.hasContactedAJoinedLocator && state.view != null) { return findCoordinatorFromView(); } FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId); Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>(); - long waitTime = services.getConfig().getLocatorWaitTime() * 1000; - if (waitTime <= 0) { - waitTime = services.getConfig().getMemberTimeout() * 2; - } - long giveUpTime = System.currentTimeMillis() + waitTime; + + long giveUpTime = System.currentTimeMillis() + services.getConfig().getLocatorWaitTime() * 1000; + int connectTimeout = (int)services.getConfig().getMemberTimeout(); boolean anyResponses = false; boolean flagsSet = false; logger.debug("sending {} to {}", request, locators); + + state.hasContactedAJoinedLocator = false; + state.locatorsContacted = 0; do { for (InetSocketAddress addr: locators) { @@ -776,42 +791,46 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { addr.getAddress(), addr.getPort(), request, connectTimeout, true); FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null; - if (response != null && response.getCoordinator() != null) { - anyResponses = true; - NetView v = response.getView(); - int viewId = v == null? -1 : v.getViewId(); - if (viewId > state.viewId) { - // if the view has changed it is possible that a member - // that we already tried to join with will become coordinator - state.alreadyTried.clear(); - state.viewId = viewId; - state.view = v; - state.registrants.clear(); - if (response.getRegistrants() != null) { - state.registrants.addAll(response.getRegistrants()); - } + // TODO we don't want to give up on the locators if we receive + // a response from a locator that's joined the system. Otherwise + // we'll give up and cause a split-brain + if (response != null) { + state.locatorsContacted++; + if (response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) { + state.hasContactedAJoinedLocator = true; } - coordinators.add(response.getCoordinator()); - if (!flagsSet) { - flagsSet = true; - inheritSettingsFromLocator(addr, response); + if (response.getCoordinator() != null) { + anyResponses = true; + NetView v = response.getView(); + int viewId = v == null? -1 : v.getViewId(); + if (viewId > state.viewId) { + // if the view has changed it is possible that a member + // that we already tried to join with will become coordinator + state.alreadyTried.clear(); + state.viewId = viewId; + state.view = v; + state.registrants.clear(); + if (response.getRegistrants() != null) { + state.registrants.addAll(response.getRegistrants()); + } + } + coordinators.add(response.getCoordinator()); + if (!flagsSet) { + flagsSet = true; + inheritSettingsFromLocator(addr, response); + } } } } catch (IOException | ClassNotFoundException problem) { } } - if (coordinators.isEmpty()) { - return false; - } - if (!anyResponses) { - try { Thread.sleep(1000); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } } while (!anyResponses && System.currentTimeMillis() < giveUpTime); + if (coordinators.isEmpty()) { + return false; + } + Iterator<InternalDistributedMember> it = coordinators.iterator(); if (coordinators.size() == 1) { state.possibleCoordinator = it.next(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java index adb49b9..9f91a74 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java @@ -61,4 +61,19 @@ public class Transport extends UDP { super.init(); } + /* + * (non-Javadoc) + * @see org.jgroups.protocols.UDP#stop() + * JGroups is not terminating its timer. I contacted the jgroups-users + * email list about this. + */ + @Override + public void stop() { + super.stop(); + if (!getTimer().isShutdown()) { + getTimer().stop(); + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index 2e903f7..9f079fa 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -1884,12 +1884,18 @@ public class Connection implements Runnable { } catch (ClosedChannelException e) { this.readerShuttingDown = true; + if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { + initiateSuspicionIfShared(); + } try { requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e)); } catch (Exception ex) {} return; } catch (IOException e) { + if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { + initiateSuspicionIfShared(); + } if (! isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08 ) { @@ -1902,7 +1908,6 @@ public class Connection implements Runnable { } } } - initiateSuspicionIfShared(); this.readerShuttingDown = true; try { requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e)); @@ -1914,6 +1919,9 @@ public class Connection implements Runnable { if (!stopped && ! isSocketClosed() ) { logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e); } + if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { + initiateSuspicionIfShared(); + } this.readerShuttingDown = true; try { requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e)); @@ -2434,6 +2442,9 @@ public class Connection implements Runnable { this.stopped = true; } catch (IOException io) { + if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { + initiateSuspicionIfShared(); + } boolean closed = isSocketClosed() || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed for Solaris jdk 1.4.2_08 if (!closed) { @@ -2441,7 +2452,6 @@ public class Connection implements Runnable { logger.debug("{} io exception for {}", p2pReaderName(), this, io); } } - initiateSuspicionIfShared(); this.readerShuttingDown = true; try { requestClose(LocalizedStrings.Connection_IOEXCEPTION_RECEIVED_0.toLocalizedString(io)); @@ -2468,6 +2478,7 @@ public class Connection implements Runnable { if (!stopped && !(e instanceof InterruptedException) ) { logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_RECEIVED, p2pReaderName()), e); } + initiateSuspicionIfShared(); if (isSocketClosed()) { stopped = true; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java index 42594cf..41b0df7 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java @@ -136,7 +136,7 @@ public class GMSJoinLeaveJUnitTest { mockMembers[1].setVmViewId(viewId-1); set.add(mockMembers[1]); state.alreadyTried = set; - state.hasContactedALocator = true; + state.hasContactedAJoinedLocator = true; // simulate a response being received InternalDistributedMember sender = mockMembers[2];