This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8fb8caa8d5b6f79c2d60e7fbe3c76709f4a81891
Author: Aaron Lindsey <alind...@vmware.com>
AuthorDate: Thu Jun 18 15:33:52 2020 -0700

    GEODE-8241: Locator observes locator-wait-time (#5236)
    
    In the case where a locator starts up and is unable to connect to any
    other locators, it may decide to become the membership coordinator even
    if locator-wait-time has not elapsed.
    
    This change addresses this issue by requiring a locator to wait for
    locator-wait-time before deciding to become the coordinator.
    
    Co-authored-by: Aaron Lindsey <alind...@vmware.com>
    Co-authored-by: Vincent Ford <vf...@pivotal.io>
    Co-authored-by: Bill Burcham <bburc...@pivotal.io>
    (cherry picked from commit 720a4caea2ddb22296aa3225fc5264d2096cdf20)
---
 .../membership/gms/MembershipIntegrationTest.java  | 120 ++++++++++++++++++++-
 .../membership/gms/membership/GMSJoinLeave.java    |  21 +++-
 2 files changed, 133 insertions(+), 8 deletions(-)

diff --git 
a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
 
b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
index fcc15b2..c875a52 100644
--- 
a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
+++ 
b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
@@ -14,13 +14,19 @@
  */
 package org.apache.geode.distributed.internal.membership.gms;
 
+import static 
org.apache.geode.distributed.internal.membership.api.MembershipConfig.DEFAULT_LOCATOR_WAIT_TIME;
+import static 
org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.FIND_LOCATOR_RETRY_SLEEP;
+import static 
org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.JOIN_RETRY_SLEEP;
+import static 
org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.getMinimumRetriesBeforeBecomingCoordinator;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -47,6 +53,7 @@ import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
 import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 /**
  * Tests of using the membership APIs to make multiple Membership systems that 
communicate
@@ -60,6 +67,9 @@ public class MembershipIntegrationTest {
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
   @Before
   public void before() throws IOException, MembershipConfigurationException {
     localHost = LocalHostUtil.getLocalHost();
@@ -172,6 +182,96 @@ public class MembershipIntegrationTest {
     stop(locator2, locator1);
   }
 
+  @Test
+  public void secondMembershipPausesForLocatorWaitTime()
+      throws IOException, MemberStartupException, InterruptedException {
+
+    /*
+     * Start a locator for the coordinator (membership) so we have a port for 
it.
+     *
+     * Its locator-wait-time is set to 0 so it eventually (soon after 
membership is started) forms a
+     * distributed system and becomes a coordinator.
+     */
+
+    final MembershipLocator<MemberIdentifier> coordinatorLocator = 
createLocator(0);
+    coordinatorLocator.start();
+    final int coordinatorLocatorPort = coordinatorLocator.getPort();
+
+    final Membership<MemberIdentifier> coordinatorMembership =
+        createMembership(coordinatorLocator, coordinatorLocatorPort);
+
+    /*
+     * We have not even started the membership yet — connection attempts will 
certainly fail until
+     * we do. This is a bit like the locator (host) not being present in DNS 
(yet).
+     */
+
+    /*
+     * Start a second locator and membership trying to join via the 
coordinator (membership) that
+     * hasn't yet started behind the port.
+     *
+     * Set its locator-wait-time so it'll not become a coordinator right away, 
allowing time for the
+     * other member to start and become a coordinator.
+     */
+
+    final MembershipLocator<MemberIdentifier> lateJoiningLocator = 
createLocator(0);
+    lateJoiningLocator.start();
+    final int lateJoiningLocatorPort = lateJoiningLocator.getPort();
+
+    final int[] locatorPorts = new int[] {coordinatorLocatorPort, 
lateJoiningLocatorPort};
+
+    // minimum duration a locator waits to become the coordinator, regardless 
of locatorWaitTime
+    final Duration minimumJoinWaitTime = Duration
+        // amount of sleep time per retry in GMSJoinLeave.join()
+        .ofMillis(JOIN_RETRY_SLEEP + FIND_LOCATOR_RETRY_SLEEP)
+        // expected number of retries in GMSJoinLeave.join()
+        
.multipliedBy(getMinimumRetriesBeforeBecomingCoordinator(locatorPorts.length));
+
+    /*
+     * By setting locatorWaitTime to 10x the minimumJoinWaitTime, we are 
trying to make sure the
+     * locatorWaitTime is sufficiently larger than the minimum so we can 
reliably detect whether
+     * the lateJoiningMembership is waiting for the full locatorWaitTime and 
not just the minimum
+     * wait time.
+     */
+    final int locatorWaitTime = (int) (10 * minimumJoinWaitTime.getSeconds());
+
+    final MembershipConfig lateJoiningMembershipConfig =
+        createMembershipConfig(true, locatorWaitTime, locatorPorts);
+    final Membership<MemberIdentifier> lateJoiningMembership =
+        createMembership(lateJoiningMembershipConfig, lateJoiningLocator);
+
+    CompletableFuture<Void> lateJoiningMembershipStartup = 
executorServiceRule.runAsync(() -> {
+      try {
+        start(lateJoiningMembership);
+      } catch (MemberStartupException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    /*
+     * By sleeping for 2x the minimumJoinWaitTime, we are trying to make sure 
we sleep for
+     * longer than the minimum but shorter than the locatorWaitTime so we can 
detect whether the
+     * lateJoiningMembership is waiting for the full locatorWaitTime and not 
just the minimum
+     * wait time.
+     */
+    Thread.sleep(2 * minimumJoinWaitTime.toMillis());
+
+    /*
+     * Now start the coordinator (membership), after waiting longer than the 
minimum wait time for
+     * connecting to a locator but shorter than the locator-wait-time.
+     */
+    start(coordinatorMembership);
+
+    await().untilAsserted(() -> 
assertThat(lateJoiningMembershipStartup).isCompleted());
+
+    await().untilAsserted(
+        () -> 
assertThat(coordinatorMembership.getView().getMembers()).hasSize(2));
+    await().untilAsserted(
+        () -> 
assertThat(lateJoiningMembership.getView().getMembers()).hasSize(2));
+
+    stop(coordinatorMembership, lateJoiningMembership);
+    stop(coordinatorLocator, lateJoiningLocator);
+  }
+
   private void start(final Membership<MemberIdentifier> membership)
       throws MemberStartupException {
     membership.start();
@@ -183,8 +283,15 @@ public class MembershipIntegrationTest {
       final int... locatorPorts)
       throws MembershipConfigurationException {
     final boolean isALocator = embeddedLocator != null;
-    final MembershipConfig config = createMembershipConfig(isALocator, 
locatorPorts);
+    final MembershipConfig config =
+        createMembershipConfig(isALocator, DEFAULT_LOCATOR_WAIT_TIME, 
locatorPorts);
+    return createMembership(config, embeddedLocator);
+  }
 
+  private Membership<MemberIdentifier> createMembership(
+      final MembershipConfig config,
+      final MembershipLocator<MemberIdentifier> embeddedLocator)
+      throws MembershipConfigurationException {
     final MemberIdentifierFactoryImpl memberIdFactory = new 
MemberIdentifierFactoryImpl();
 
     final TcpClient locatorClient =
@@ -200,7 +307,8 @@ public class MembershipIntegrationTest {
 
   private MembershipConfig createMembershipConfig(
       final boolean isALocator,
-      final int[] locatorPorts) {
+      final int locatorWaitTime,
+      final int... locatorPorts) {
     return new MembershipConfig() {
       public String getLocators() {
         return getLocatorString(locatorPorts);
@@ -213,6 +321,11 @@ public class MembershipIntegrationTest {
       public int getVmKind() {
         return isALocator ? MemberIdentifier.LOCATOR_DM_TYPE : 
MemberIdentifier.NORMAL_DM_TYPE;
       }
+
+      @Override
+      public int getLocatorWaitTime() {
+        return locatorWaitTime;
+      }
     };
   }
 
@@ -233,7 +346,8 @@ public class MembershipIntegrationTest {
         () -> LoggingExecutors.newCachedThreadPool("membership", false);
     Path locatorDirectory = temporaryFolder.newFolder().toPath();
 
-    final MembershipConfig config = createMembershipConfig(true, locatorPorts);
+    final MembershipConfig config =
+        createMembershipConfig(true, DEFAULT_LOCATOR_WAIT_TIME, locatorPorts);
 
     return MembershipLocatorBuilder.<MemberIdentifier>newLocatorBuilder(
         socketCreator,
diff --git 
a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
 
b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 59783e9..087533d 100644
--- 
a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ 
b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -91,10 +91,15 @@ public class GMSJoinLeave<ID extends MemberIdentifier> 
implements JoinLeave<ID>
   /**
    * amount of time to sleep before trying to join after a failed attempt
    */
-  private static final int JOIN_RETRY_SLEEP =
+  public static final int JOIN_RETRY_SLEEP =
       Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "join-retry-sleep", 
1000);
 
   /**
+   * amount of time to sleep before trying to contact a locator after a failed 
attempt
+   */
+  public static final int FIND_LOCATOR_RETRY_SLEEP = 1_000;
+
+  /**
    * time to wait for a broadcast message to be transmitted by jgroups
    */
   private static final long BROADCAST_MESSAGE_SLEEP_TIME =
@@ -295,6 +300,11 @@ public class GMSJoinLeave<ID extends MemberIdentifier> 
implements JoinLeave<ID>
     return viewInstallationLock;
   }
 
+  @VisibleForTesting
+  public static int getMinimumRetriesBeforeBecomingCoordinator(int 
locatorsSize) {
+    return locatorsSize * 2;
+  }
+
   /**
    * attempt to join the distributed system loop send a join request to a 
locator & get a response
    * <p>
@@ -324,20 +334,22 @@ public class GMSJoinLeave<ID extends MemberIdentifier> 
implements JoinLeave<ID>
       long startTime = System.currentTimeMillis();
       long locatorGiveUpTime = startTime + locatorWaitTime;
       long giveupTime = startTime + timeout;
-      int minimumRetriesBeforeBecomingCoordinator = locators.size() * 2;
+      int minimumRetriesBeforeBecomingCoordinator =
+          getMinimumRetriesBeforeBecomingCoordinator(locators.size());
 
       for (int tries = 0; !this.isJoined && !this.isStopping; tries++) {
         logger.debug("searching for the membership coordinator");
         boolean found = findCoordinator();
         logger.info("Discovery state after looking for membership coordinator 
is {}",
             state);
+        long now = System.currentTimeMillis();
         if (found) {
           logger.info("found possible coordinator {}", 
state.possibleCoordinator);
           if (localAddress.preferredForCoordinator()
               && state.possibleCoordinator.equals(this.localAddress)) {
             // if we haven't contacted a member of a cluster maybe this node 
should
             // become the coordinator.
-            if (state.joinedMembersContacted <= 0 &&
+            if (state.joinedMembersContacted <= 0 && (now >= 
locatorGiveUpTime) &&
                 (tries >= minimumRetriesBeforeBecomingCoordinator ||
                     state.locatorsContacted >= locators.size())) {
               synchronized (viewInstallationLock) {
@@ -360,7 +372,6 @@ public class GMSJoinLeave<ID extends MemberIdentifier> 
implements JoinLeave<ID>
             }
           }
         } else {
-          long now = System.currentTimeMillis();
           if (state.locatorsContacted <= 0) {
             if (now > locatorGiveUpTime) {
               // break out of the loop and return false
@@ -1189,7 +1200,7 @@ public class GMSJoinLeave<ID extends MemberIdentifier> 
implements JoinLeave<ID>
           logger.info("Exception thrown when contacting a locator", problem);
           if (state.locatorsContacted == 0 && System.currentTimeMillis() < 
giveUpTime) {
             try {
-              Thread.sleep(1000);
+              Thread.sleep(FIND_LOCATOR_RETRY_SLEEP);
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               services.getCancelCriterion().checkCancelInProgress(e);

Reply via email to