This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new e5025d3 GEODE-4765: Add retries when retrieving cluster config (#1539)
e5025d3 is described below
commit e5025d399c56b1f86e36c37638108d27655886af
Author: Jens Deppe <[email protected]>
AuthorDate: Mon Mar 5 07:15:45 2018 -0800
GEODE-4765: Add retries when retrieving cluster config (#1539)
- This helps when a locator is reconnecting to the distributed system and
isn't
fully ready yet.
- During reconnect, pass a fresh cache to the ClusterConfigurationService -
this fixes the 'DLock destroyed' issue.
- Also fixes GEODE-4730
---
.../internal/ClusterConfigurationService.java | 7 +-
.../distributed/internal/InternalLocator.java | 5 +-
.../internal/cache/ClusterConfigurationLoader.java | 65 ++++++++++---
.../ClusterConfigLocatorRestartDUnitTest.java | 106 +++++++++++++++++++++
4 files changed, 166 insertions(+), 17 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 37a76a9..8592f49 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -75,6 +75,7 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
@@ -130,7 +131,7 @@ public class ClusterConfigurationService {
private final AtomicReference<SharedConfigurationStatus> status = new
AtomicReference<>();
private final InternalCache cache;
- private final DistributedLockService sharedConfigLockingService;
+ private DistributedLockService sharedConfigLockingService;
public ClusterConfigurationService(InternalCache cache) throws IOException {
this.cache = cache;
@@ -518,7 +519,7 @@ public class ClusterConfigurationService {
public ConfigurationResponse createConfigurationResponse(Set<String> groups)
throws IOException {
ConfigurationResponse configResponse = null;
- boolean isLocked =
this.sharedConfigLockingService.lock(SHARED_CONFIG_LOCK_NAME, 5000, 5000);
+ boolean isLocked = lockSharedConfiguration();
try {
if (isLocked) {
configResponse = new ConfigurationResponse();
@@ -536,7 +537,7 @@ public class ClusterConfigurationService {
return configResponse;
}
} finally {
- this.sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME);
+ unlockSharedConfiguration();
}
return configResponse;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 7c7b5af..955723d 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -1041,7 +1041,10 @@ public class InternalLocator extends Locator implements
ConnectListener {
this.productUseLog.reopen();
}
this.productUseLog.monitorUse(newSystem);
- startSharedConfigurationService();
+ if (isSharedConfigurationEnabled()) {
+ this.sharedConfig = new ClusterConfigurationService(newCache);
+ startSharedConfigurationService();
+ }
if (!this.server.isAlive()) {
logger.info("Locator restart: starting TcpServer");
startTcpServer();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
index 7716c13..7ec52aa 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
@@ -47,9 +47,13 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.UnmodifiableException;
import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.DistributionConfig;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -69,6 +73,8 @@ public class ClusterConfigurationLoader {
private static final Logger logger = LogService.getLogger();
+ private static final Function GET_CLUSTER_CONFIG_FUNCTION = new
GetClusterConfigurationFunction();
+
/**
* Deploys the jars received from shared configuration, it undeploys any
other jars that were not
* part of shared configuration
@@ -279,21 +285,25 @@ public class ClusterConfigurationLoader {
GetClusterConfigurationFunction function = new
GetClusterConfigurationFunction();
ConfigurationResponse response = null;
- for (InternalDistributedMember locator : locatorList) {
- ResultCollector resultCollector =
-
FunctionService.onMember(locator).setArguments(groups).execute(function);
- Object result = ((ArrayList) resultCollector.getResult()).get(0);
- if (result instanceof ConfigurationResponse) {
- response = (ConfigurationResponse) result;
- response.setMember(locator);
- break;
- } else {
- logger.error("Received invalid result from {}: {}",
locator.toString(), result);
- if (result instanceof Throwable) {
- // log the stack trace.
- logger.error(result.toString(), result);
+
+ int attempts = 6;
+ OUTER: while (attempts > 0) {
+ for (InternalDistributedMember locator : locatorList) {
+ logger.info("Attempting to retrieve cluster configuration from {} - {}
attempts remaining",
+ locator.getName(), attempts);
+ response = requestConfigurationFromOneLocator(locator, groups);
+ if (response != null) {
+ break OUTER;
}
}
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ex) {
+ break;
+ }
+
+ attempts--;
}
// if the response is null
@@ -305,6 +315,35 @@ public class ClusterConfigurationLoader {
return response;
}
+ private ConfigurationResponse requestConfigurationFromOneLocator(
+ InternalDistributedMember locator, Set<String> groups) {
+ ConfigurationResponse configResponse = null;
+
+ try {
+ ResultCollector resultCollector =
FunctionService.onMember(locator).setArguments(groups)
+ .execute(GET_CLUSTER_CONFIG_FUNCTION);
+ Object result = ((ArrayList) resultCollector.getResult()).get(0);
+ if (result instanceof ConfigurationResponse) {
+ configResponse = (ConfigurationResponse) result;
+ configResponse.setMember(locator);
+ } else {
+ logger.error("Received invalid result from {}: {}",
locator.toString(), result);
+ if (result instanceof Throwable) {
+ // log the stack trace.
+ logger.error(result.toString(), result);
+ }
+ }
+ } catch (FunctionException fex) {
+ // Rethrow unless we're possibly reconnecting
+ if (!(fex.getCause() instanceof LockServiceDestroyedException
+ || fex.getCause() instanceof FunctionInvocationTargetException)) {
+ throw fex;
+ }
+ }
+
+ return configResponse;
+ }
+
Set<String> getGroups(String groupString) {
if (StringUtils.isBlank(groupString)) {
return new HashSet<>();
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigLocatorRestartDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigLocatorRestartDUnitTest.java
new file mode 100644
index 0000000..bcc5ae0
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigLocatorRestartDUnitTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.configuration;
+
+import java.util.Properties;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import
org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category(DistributedTest.class)
+public class ClusterConfigLocatorRestartDUnitTest {
+
+ @Rule
+ public ClusterStartupRule rule = new ClusterStartupRule();
+
+ @Rule
+ public GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Test
+ public void serverRestartsAfterLocatorReconnects() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "5000");
+ MemberVM locator0 = rule.startLocatorVM(0, props);
+
+ rule.startServerVM(1, props, locator0.getPort());
+ MemberVM server2 = rule.startServerVM(2, props, locator0.getPort());
+
+ server2.invokeAsync(() -> MembershipManagerHelper
+
.crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()));
+ locator0.invokeAsync(() -> MembershipManagerHelper
+
.crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()));
+
+ // Wait some time to reconnect
+ Thread.sleep(10000);
+
+ rule.startServerVM(3, locator0.getPort());
+
+ gfsh.connectAndVerify(locator0);
+ gfsh.executeAndAssertThat("list
members").statusIsSuccess().tableHasColumnOnlyWithValues("Name",
+ "locator-0", "server-1", "server-2", "server-3");
+ }
+
+ @Test
+ public void serverRestartsAfterOneLocatorDies() throws Exception {
+ IgnoredException.addIgnoredException("This member is no longer in the
membership view");
+ IgnoredException.addIgnoredException("This node is no longer in the
membership view");
+
+ // Otherwise we get a graceful shutdown...
+ Host.getHost(0).getVM(0).invoke(() -> {
+ if (InternalDistributedSystem.shutdownHook != null) {
+
Runtime.getRuntime().removeShutdownHook(InternalDistributedSystem.shutdownHook);
+ }
+ });
+
+ Properties props = new Properties();
+ props.setProperty(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "5000");
+ MemberVM locator0 = rule.startLocatorVM(0, props);
+ MemberVM locator1 = rule.startLocatorVM(1, props, locator0.getPort());
+
+ MemberVM server2 = rule.startServerVM(2, props, locator0.getPort(),
locator1.getPort());
+ MemberVM server3 = rule.startServerVM(3, props, locator0.getPort(),
locator1.getPort());
+
+ // Go away and don't come back
+ locator0.invokeAsync(() -> System.exit(1));
+
+ server3.invokeAsync(() -> MembershipManagerHelper
+
.crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()));
+
+ // Wait some time to reconnect
+ Thread.sleep(10000);
+
+ rule.startServerVM(4, locator1.getPort(), locator0.getPort());
+
+ gfsh.connectAndVerify(locator1);
+ gfsh.executeAndAssertThat("list
members").statusIsSuccess().tableHasColumnOnlyWithValues("Name",
+ "locator-1", "server-2", "server-3", "server-4");
+
+ // Recover the VM so that subsequent rule cleanup works
+ Host.getHost(0).getVM(0).bounce();
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].