This is an automated email from the ASF dual-hosted git repository. jasonhuynh pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8fb8caa8d5b6f79c2d60e7fbe3c76709f4a81891 Author: Aaron Lindsey <alind...@vmware.com> AuthorDate: Thu Jun 18 15:33:52 2020 -0700 GEODE-8241: Locator observes locator-wait-time (#5236) In the case where a locator starts up and is unable to connect to any other locators, it may decide to become the membership coordinator even if locator-wait-time has not elapsed. This change addresses this issue by requiring a locator to wait for locator-wait-time before deciding to become the coordinator. Co-authored-by: Aaron Lindsey <alind...@vmware.com> Co-authored-by: Vincent Ford <vf...@pivotal.io> Co-authored-by: Bill Burcham <bburc...@pivotal.io> (cherry picked from commit 720a4caea2ddb22296aa3225fc5264d2096cdf20) --- .../membership/gms/MembershipIntegrationTest.java | 120 ++++++++++++++++++++- .../membership/gms/membership/GMSJoinLeave.java | 21 +++- 2 files changed, 133 insertions(+), 8 deletions(-) diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java index fcc15b2..c875a52 100644 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java @@ -14,13 +14,19 @@ */ package org.apache.geode.distributed.internal.membership.gms; +import static org.apache.geode.distributed.internal.membership.api.MembershipConfig.DEFAULT_LOCATOR_WAIT_TIME; +import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.FIND_LOCATOR_RETRY_SLEEP; +import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.JOIN_RETRY_SLEEP; +import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.getMinimumRetriesBeforeBecomingCoordinator; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.net.InetAddress; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -47,6 +53,7 @@ import org.apache.geode.internal.inet.LocalHostUtil; import org.apache.geode.internal.serialization.DSFIDSerializer; import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl; import org.apache.geode.logging.internal.executors.LoggingExecutors; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; /** * Tests of using the membership APIs to make multiple Membership systems that communicate @@ -60,6 +67,9 @@ public class MembershipIntegrationTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + @Before public void before() throws IOException, MembershipConfigurationException { localHost = LocalHostUtil.getLocalHost(); @@ -172,6 +182,96 @@ public class MembershipIntegrationTest { stop(locator2, locator1); } + @Test + public void secondMembershipPausesForLocatorWaitTime() + throws IOException, MemberStartupException, InterruptedException { + + /* + * Start a locator for the coordinator (membership) so we have a port for it. + * + * Its locator-wait-time is set to 0 so it eventually (soon after membership is started) forms a + * distributed system and becomes a coordinator. + */ + + final MembershipLocator<MemberIdentifier> coordinatorLocator = createLocator(0); + coordinatorLocator.start(); + final int coordinatorLocatorPort = coordinatorLocator.getPort(); + + final Membership<MemberIdentifier> coordinatorMembership = + createMembership(coordinatorLocator, coordinatorLocatorPort); + + /* + * We have not even started the membership yet — connection attempts will certainly fail until + * we do. This is a bit like the locator (host) not being present in DNS (yet). + */ + + /* + * Start a second locator and membership trying to join via the coordinator (membership) that + * hasn't yet started behind the port. + * + * Set its locator-wait-time so it'll not become a coordinator right away, allowing time for the + * other member to start and become a coordinator. + */ + + final MembershipLocator<MemberIdentifier> lateJoiningLocator = createLocator(0); + lateJoiningLocator.start(); + final int lateJoiningLocatorPort = lateJoiningLocator.getPort(); + + final int[] locatorPorts = new int[] {coordinatorLocatorPort, lateJoiningLocatorPort}; + + // minimum duration a locator waits to become the coordinator, regardless of locatorWaitTime + final Duration minimumJoinWaitTime = Duration + // amount of sleep time per retry in GMSJoinLeave.join() + .ofMillis(JOIN_RETRY_SLEEP + FIND_LOCATOR_RETRY_SLEEP) + // expected number of retries in GMSJoinLeave.join() + .multipliedBy(getMinimumRetriesBeforeBecomingCoordinator(locatorPorts.length)); + + /* + * By setting locatorWaitTime to 10x the minimumJoinWaitTime, we are trying to make sure the + * locatorWaitTime is sufficiently larger than the minimum so we can reliably detect whether + * the lateJoiningMembership is waiting for the full locatorWaitTime and not just the minimum + * wait time. + */ + final int locatorWaitTime = (int) (10 * minimumJoinWaitTime.getSeconds()); + + final MembershipConfig lateJoiningMembershipConfig = + createMembershipConfig(true, locatorWaitTime, locatorPorts); + final Membership<MemberIdentifier> lateJoiningMembership = + createMembership(lateJoiningMembershipConfig, lateJoiningLocator); + + CompletableFuture<Void> lateJoiningMembershipStartup = executorServiceRule.runAsync(() -> { + try { + start(lateJoiningMembership); + } catch (MemberStartupException e) { + throw new RuntimeException(e); + } + }); + + /* + * By sleeping for 2x the minimumJoinWaitTime, we are trying to make sure we sleep for + * longer than the minimum but shorter than the locatorWaitTime so we can detect whether the + * lateJoiningMembership is waiting for the full locatorWaitTime and not just the minimum + * wait time. + */ + Thread.sleep(2 * minimumJoinWaitTime.toMillis()); + + /* + * Now start the coordinator (membership), after waiting longer than the minimum wait time for + * connecting to a locator but shorter than the locator-wait-time. + */ + start(coordinatorMembership); + + await().untilAsserted(() -> assertThat(lateJoiningMembershipStartup).isCompleted()); + + await().untilAsserted( + () -> assertThat(coordinatorMembership.getView().getMembers()).hasSize(2)); + await().untilAsserted( + () -> assertThat(lateJoiningMembership.getView().getMembers()).hasSize(2)); + + stop(coordinatorMembership, lateJoiningMembership); + stop(coordinatorLocator, lateJoiningLocator); + } + private void start(final Membership<MemberIdentifier> membership) throws MemberStartupException { membership.start(); @@ -183,8 +283,15 @@ public class MembershipIntegrationTest { final int... locatorPorts) throws MembershipConfigurationException { final boolean isALocator = embeddedLocator != null; - final MembershipConfig config = createMembershipConfig(isALocator, locatorPorts); + final MembershipConfig config = + createMembershipConfig(isALocator, DEFAULT_LOCATOR_WAIT_TIME, locatorPorts); + return createMembership(config, embeddedLocator); + } + private Membership<MemberIdentifier> createMembership( + final MembershipConfig config, + final MembershipLocator<MemberIdentifier> embeddedLocator) + throws MembershipConfigurationException { final MemberIdentifierFactoryImpl memberIdFactory = new MemberIdentifierFactoryImpl(); final TcpClient locatorClient = @@ -200,7 +307,8 @@ public class MembershipIntegrationTest { private MembershipConfig createMembershipConfig( final boolean isALocator, - final int[] locatorPorts) { + final int locatorWaitTime, + final int... locatorPorts) { return new MembershipConfig() { public String getLocators() { return getLocatorString(locatorPorts); @@ -213,6 +321,11 @@ public class MembershipIntegrationTest { public int getVmKind() { return isALocator ? MemberIdentifier.LOCATOR_DM_TYPE : MemberIdentifier.NORMAL_DM_TYPE; } + + @Override + public int getLocatorWaitTime() { + return locatorWaitTime; + } }; } @@ -233,7 +346,8 @@ public class MembershipIntegrationTest { () -> LoggingExecutors.newCachedThreadPool("membership", false); Path locatorDirectory = temporaryFolder.newFolder().toPath(); - final MembershipConfig config = createMembershipConfig(true, locatorPorts); + final MembershipConfig config = + createMembershipConfig(true, DEFAULT_LOCATOR_WAIT_TIME, locatorPorts); return MembershipLocatorBuilder.<MemberIdentifier>newLocatorBuilder( socketCreator, diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 59783e9..087533d 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -91,10 +91,15 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID> /** * amount of time to sleep before trying to join after a failed attempt */ - private static final int JOIN_RETRY_SLEEP = + public static final int JOIN_RETRY_SLEEP = Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "join-retry-sleep", 1000); /** + * amount of time to sleep before trying to contact a locator after a failed attempt + */ + public static final int FIND_LOCATOR_RETRY_SLEEP = 1_000; + + /** * time to wait for a broadcast message to be transmitted by jgroups */ private static final long BROADCAST_MESSAGE_SLEEP_TIME = @@ -295,6 +300,11 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID> return viewInstallationLock; } + @VisibleForTesting + public static int getMinimumRetriesBeforeBecomingCoordinator(int locatorsSize) { + return locatorsSize * 2; + } + /** * attempt to join the distributed system loop send a join request to a locator & get a response * <p> @@ -324,20 +334,22 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID> long startTime = System.currentTimeMillis(); long locatorGiveUpTime = startTime + locatorWaitTime; long giveupTime = startTime + timeout; - int minimumRetriesBeforeBecomingCoordinator = locators.size() * 2; + int minimumRetriesBeforeBecomingCoordinator = + getMinimumRetriesBeforeBecomingCoordinator(locators.size()); for (int tries = 0; !this.isJoined && !this.isStopping; tries++) { logger.debug("searching for the membership coordinator"); boolean found = findCoordinator(); logger.info("Discovery state after looking for membership coordinator is {}", state); + long now = System.currentTimeMillis(); if (found) { logger.info("found possible coordinator {}", state.possibleCoordinator); if (localAddress.preferredForCoordinator() && state.possibleCoordinator.equals(this.localAddress)) { // if we haven't contacted a member of a cluster maybe this node should // become the coordinator. - if (state.joinedMembersContacted <= 0 && + if (state.joinedMembersContacted <= 0 && (now >= locatorGiveUpTime) && (tries >= minimumRetriesBeforeBecomingCoordinator || state.locatorsContacted >= locators.size())) { synchronized (viewInstallationLock) { @@ -360,7 +372,6 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID> } } } else { - long now = System.currentTimeMillis(); if (state.locatorsContacted <= 0) { if (now > locatorGiveUpTime) { // break out of the loop and return false @@ -1189,7 +1200,7 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID> logger.info("Exception thrown when contacting a locator", problem); if (state.locatorsContacted == 0 && System.currentTimeMillis() < giveUpTime) { try { - Thread.sleep(1000); + Thread.sleep(FIND_LOCATOR_RETRY_SLEEP); } catch (InterruptedException e) { Thread.currentThread().interrupt(); services.getCancelCriterion().checkCancelInProgress(e);