This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e5025d3  GEODE-4765: Add retries when retrieving cluster config (#1539)
e5025d3 is described below

commit e5025d399c56b1f86e36c37638108d27655886af
Author: Jens Deppe <[email protected]>
AuthorDate: Mon Mar 5 07:15:45 2018 -0800

    GEODE-4765: Add retries when retrieving cluster config (#1539)
    
    - This helps when a locator is reconnecting to the distributed system and 
isn't
      fully ready yet.
    - During reconnect, pass a fresh cache to the ClusterConfigurationService -
      this fixes the 'DLock destroyed' issue.
    - Also fixes GEODE-4730
---
 .../internal/ClusterConfigurationService.java      |   7 +-
 .../distributed/internal/InternalLocator.java      |   5 +-
 .../internal/cache/ClusterConfigurationLoader.java |  65 ++++++++++---
 .../ClusterConfigLocatorRestartDUnitTest.java      | 106 +++++++++++++++++++++
 4 files changed, 166 insertions(+), 17 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 37a76a9..8592f49 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -75,6 +75,7 @@ import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.LockServiceDestroyedException;
 import org.apache.geode.distributed.internal.locks.DLockService;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionArguments;
@@ -130,7 +131,7 @@ public class ClusterConfigurationService {
   private final AtomicReference<SharedConfigurationStatus> status = new 
AtomicReference<>();
 
   private final InternalCache cache;
-  private final DistributedLockService sharedConfigLockingService;
+  private DistributedLockService sharedConfigLockingService;
 
   public ClusterConfigurationService(InternalCache cache) throws IOException {
     this.cache = cache;
@@ -518,7 +519,7 @@ public class ClusterConfigurationService {
   public ConfigurationResponse createConfigurationResponse(Set<String> groups) 
throws IOException {
     ConfigurationResponse configResponse = null;
 
-    boolean isLocked = 
this.sharedConfigLockingService.lock(SHARED_CONFIG_LOCK_NAME, 5000, 5000);
+    boolean isLocked = lockSharedConfiguration();
     try {
       if (isLocked) {
         configResponse = new ConfigurationResponse();
@@ -536,7 +537,7 @@ public class ClusterConfigurationService {
         return configResponse;
       }
     } finally {
-      this.sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME);
+      unlockSharedConfiguration();
     }
     return configResponse;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 7c7b5af..955723d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -1041,7 +1041,10 @@ public class InternalLocator extends Locator implements 
ConnectListener {
         this.productUseLog.reopen();
       }
       this.productUseLog.monitorUse(newSystem);
-      startSharedConfigurationService();
+      if (isSharedConfigurationEnabled()) {
+        this.sharedConfig = new ClusterConfigurationService(newCache);
+        startSharedConfigurationService();
+      }
       if (!this.server.isAlive()) {
         logger.info("Locator restart: starting TcpServer");
         startTcpServer();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
index 7716c13..7ec52aa 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
@@ -47,9 +47,13 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.UnmodifiableException;
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.LockServiceDestroyedException;
 import org.apache.geode.distributed.internal.ClusterConfigurationService;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -69,6 +73,8 @@ public class ClusterConfigurationLoader {
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final Function GET_CLUSTER_CONFIG_FUNCTION = new 
GetClusterConfigurationFunction();
+
   /**
    * Deploys the jars received from shared configuration, it undeploys any 
other jars that were not
    * part of shared configuration
@@ -279,21 +285,25 @@ public class ClusterConfigurationLoader {
     GetClusterConfigurationFunction function = new 
GetClusterConfigurationFunction();
 
     ConfigurationResponse response = null;
-    for (InternalDistributedMember locator : locatorList) {
-      ResultCollector resultCollector =
-          
FunctionService.onMember(locator).setArguments(groups).execute(function);
-      Object result = ((ArrayList) resultCollector.getResult()).get(0);
-      if (result instanceof ConfigurationResponse) {
-        response = (ConfigurationResponse) result;
-        response.setMember(locator);
-        break;
-      } else {
-        logger.error("Received invalid result from {}: {}", 
locator.toString(), result);
-        if (result instanceof Throwable) {
-          // log the stack trace.
-          logger.error(result.toString(), result);
+
+    int attempts = 6;
+    OUTER: while (attempts > 0) {
+      for (InternalDistributedMember locator : locatorList) {
+        logger.info("Attempting to retrieve cluster configuration from {} - {} 
attempts remaining",
+            locator.getName(), attempts);
+        response = requestConfigurationFromOneLocator(locator, groups);
+        if (response != null) {
+          break OUTER;
         }
       }
+
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException ex) {
+        break;
+      }
+
+      attempts--;
     }
 
     // if the response is null
@@ -305,6 +315,35 @@ public class ClusterConfigurationLoader {
     return response;
   }
 
+  private ConfigurationResponse requestConfigurationFromOneLocator(
+      InternalDistributedMember locator, Set<String> groups) {
+    ConfigurationResponse configResponse = null;
+
+    try {
+      ResultCollector resultCollector = 
FunctionService.onMember(locator).setArguments(groups)
+          .execute(GET_CLUSTER_CONFIG_FUNCTION);
+      Object result = ((ArrayList) resultCollector.getResult()).get(0);
+      if (result instanceof ConfigurationResponse) {
+        configResponse = (ConfigurationResponse) result;
+        configResponse.setMember(locator);
+      } else {
+        logger.error("Received invalid result from {}: {}", 
locator.toString(), result);
+        if (result instanceof Throwable) {
+          // log the stack trace.
+          logger.error(result.toString(), result);
+        }
+      }
+    } catch (FunctionException fex) {
+      // Rethrow unless we're possibly reconnecting
+      if (!(fex.getCause() instanceof LockServiceDestroyedException
+          || fex.getCause() instanceof FunctionInvocationTargetException)) {
+        throw fex;
+      }
+    }
+
+    return configResponse;
+  }
+
   Set<String> getGroups(String groupString) {
     if (StringUtils.isBlank(groupString)) {
       return new HashSet<>();
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigLocatorRestartDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigLocatorRestartDUnitTest.java
new file mode 100644
index 0000000..bcc5ae0
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigLocatorRestartDUnitTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.configuration;
+
+import java.util.Properties;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category(DistributedTest.class)
+public class ClusterConfigLocatorRestartDUnitTest {
+
+  @Rule
+  public ClusterStartupRule rule = new ClusterStartupRule();
+
+  @Rule
+  public GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Test
+  public void serverRestartsAfterLocatorReconnects() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "5000");
+    MemberVM locator0 = rule.startLocatorVM(0, props);
+
+    rule.startServerVM(1, props, locator0.getPort());
+    MemberVM server2 = rule.startServerVM(2, props, locator0.getPort());
+
+    server2.invokeAsync(() -> MembershipManagerHelper
+        
.crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()));
+    locator0.invokeAsync(() -> MembershipManagerHelper
+        
.crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()));
+
+    // Wait some time to reconnect
+    Thread.sleep(10000);
+
+    rule.startServerVM(3, locator0.getPort());
+
+    gfsh.connectAndVerify(locator0);
+    gfsh.executeAndAssertThat("list 
members").statusIsSuccess().tableHasColumnOnlyWithValues("Name",
+        "locator-0", "server-1", "server-2", "server-3");
+  }
+
+  @Test
+  public void serverRestartsAfterOneLocatorDies() throws Exception {
+    IgnoredException.addIgnoredException("This member is no longer in the 
membership view");
+    IgnoredException.addIgnoredException("This node is no longer in the 
membership view");
+
+    // Otherwise we get a graceful shutdown...
+    Host.getHost(0).getVM(0).invoke(() -> {
+      if (InternalDistributedSystem.shutdownHook != null) {
+        
Runtime.getRuntime().removeShutdownHook(InternalDistributedSystem.shutdownHook);
+      }
+    });
+
+    Properties props = new Properties();
+    props.setProperty(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "5000");
+    MemberVM locator0 = rule.startLocatorVM(0, props);
+    MemberVM locator1 = rule.startLocatorVM(1, props, locator0.getPort());
+
+    MemberVM server2 = rule.startServerVM(2, props, locator0.getPort(), 
locator1.getPort());
+    MemberVM server3 = rule.startServerVM(3, props, locator0.getPort(), 
locator1.getPort());
+
+    // Go away and don't come back
+    locator0.invokeAsync(() -> System.exit(1));
+
+    server3.invokeAsync(() -> MembershipManagerHelper
+        
.crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()));
+
+    // Wait some time to reconnect
+    Thread.sleep(10000);
+
+    rule.startServerVM(4, locator1.getPort(), locator0.getPort());
+
+    gfsh.connectAndVerify(locator1);
+    gfsh.executeAndAssertThat("list 
members").statusIsSuccess().tableHasColumnOnlyWithValues("Name",
+        "locator-1", "server-2", "server-3", "server-4");
+
+    // Recover the VM so that subsequent rule cleanup works
+    Host.getHost(0).getVM(0).bounce();
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to