This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 73f6783 GEODE-8574: ClusterManagementService should not throw
ClassCastExcept… (#5596)
73f6783 is described below
commit 73f6783b07f1151c1617978fb57822ade5b71414
Author: Jinmei Liao <[email protected]>
AuthorDate: Wed Oct 7 11:05:28 2020 -0700
GEODE-8574: ClusterManagementService should not throw ClassCastExcept…
(#5596)
---
.../api/LocatorClusterManagementService.java | 40 +++++++++++++++-------
.../api/LocatorClusterManagementServiceTest.java | 24 ++++++++++---
2 files changed, 47 insertions(+), 17 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
index f67ae83..f8d9f52 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
@@ -29,7 +29,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +45,6 @@ import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import
org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
@@ -236,8 +234,7 @@ public class LocatorClusterManagementService implements
ClusterManagementService
ClusterManagementRealizationResult result = new
ClusterManagementRealizationResult();
// execute function on all targeted members
- List<RealizationResult> functionResults = executeAndGetFunctionResult(
- new CacheRealizationFunction(),
+ List<RealizationResult> functionResults =
executeCacheRealizationFunction(
config, CacheElementOperation.CREATE,
targetedMembers);
functionResults.forEach(result::addMemberStatus);
@@ -338,8 +335,7 @@ public class LocatorClusterManagementService implements
ClusterManagementService
// execute function on all members
ClusterManagementRealizationResult result = new
ClusterManagementRealizationResult();
- List<RealizationResult> functionResults = executeAndGetFunctionResult(
- new CacheRealizationFunction(),
+ List<RealizationResult> functionResults =
executeCacheRealizationFunction(
config, CacheElementOperation.DELETE,
memberValidator.findServers(groupsWithThisElement));
functionResults.forEach(result::addMemberStatus);
@@ -446,7 +442,7 @@ public class LocatorClusterManagementService implements
ClusterManagementService
members = Collections.singleton(members.iterator().next());
}
- List<R> runtimeInfos = executeAndGetFunctionResult(new
CacheRealizationFunction(),
+ List<R> runtimeInfos = executeCacheRealizationFunction(
element, CacheElementOperation.GET,
members);
response.setRuntimeInfo(runtimeInfos);
@@ -593,14 +589,15 @@ public class LocatorClusterManagementService implements
ClusterManagementService
}
@VisibleForTesting
- <R> List<R> executeAndGetFunctionResult(Function function,
AbstractConfiguration configuration,
+ <R> List<R> executeCacheRealizationFunction(AbstractConfiguration
configuration,
CacheElementOperation operation,
Set<DistributedMember> targetMembers) {
if (targetMembers.size() == 0) {
return Collections.emptyList();
}
- List<R> results = new ArrayList();
+ Function function = new CacheRealizationFunction();
+
File file = null;
if (configuration instanceof HasFile) {
@@ -611,9 +608,8 @@ public class LocatorClusterManagementService implements
ClusterManagementService
Execution execution = FunctionService.onMembers(targetMembers)
.setArguments(Arrays.asList(configuration, operation, null));
((AbstractExecution) execution).setIgnoreDepartedMembers(true);
- ResultCollector rc = execution.execute(function);
- return ((List<R>) rc.getResult()).stream().filter(Objects::nonNull)
- .collect(Collectors.toList());
+ List<?> functionResults = (List<?>)
execution.execute(function).getResult();
+ return cleanResults(functionResults);
}
// if we have file arguments, we need to export the file input stream for
each member
@@ -623,6 +619,7 @@ public class LocatorClusterManagementService implements
ClusterManagementService
.getManagementAgent();
exporter = agent.getRemoteStreamExporter();
+ List<R> results = new ArrayList();
for (DistributedMember member : targetMembers) {
FileInputStream fileInputStream = null;
SimpleRemoteInputStream inputStream = null;
@@ -634,7 +631,8 @@ public class LocatorClusterManagementService implements
ClusterManagementService
Execution execution = FunctionService.onMember(member)
.setArguments(Arrays.asList(configuration, operation,
remoteInputStream));
((AbstractExecution) execution).setIgnoreDepartedMembers(true);
- results.add(((List<R>)
execution.execute(function).getResult()).get(0));
+ List<R> functionResults = cleanResults((List<?>)
execution.execute(function).getResult());
+ results.addAll(functionResults);
} catch (IOException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, "Invalid file: " +
file.getAbsolutePath());
} finally {
@@ -653,7 +651,23 @@ public class LocatorClusterManagementService implements
ClusterManagementService
}
}
}
+ return results;
+ }
+ @VisibleForTesting
+ <R> List<R> cleanResults(List<?> functionResults) {
+ List<R> results = new ArrayList<>();
+ for (Object functionResult : functionResults) {
+ if (functionResult == null) {
+ continue;
+ }
+ if (functionResult instanceof Throwable) {
+ // log the exception and continue
+ logger.warn("Error executing CacheRealizationFunction.", (Throwable)
functionResult);
+ continue;
+ }
+ results.add((R) functionResult);
+ }
return results;
}
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
index 9506e8d..c50e669 100644
---
a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
@@ -50,6 +50,7 @@ import org.junit.Test;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.GatewayReceiverConfig;
import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
@@ -216,7 +217,7 @@ public class LocatorClusterManagementServiceTest {
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(
new
RealizationResult().setMemberName("member2").setSuccess(false).setMessage("failed"));
- doReturn(functionResults).when(service).executeAndGetFunctionResult(any(),
any(), any(), any());
+
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(),
any(), any());
doReturn(Collections.singleton(mock(DistributedMember.class))).when(memberValidator)
.findServers();
@@ -232,7 +233,7 @@ public class LocatorClusterManagementServiceTest {
List<RealizationResult> functionResults = new ArrayList<>();
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(new RealizationResult().setMemberName("member2"));
- doReturn(functionResults).when(service).executeAndGetFunctionResult(any(),
any(), any(), any());
+
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(),
any(), any());
doReturn(Collections.singleton(mock(DistributedMember.class))).when(memberValidator)
.findServers();
@@ -337,7 +338,7 @@ public class LocatorClusterManagementServiceTest {
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(
new
RealizationResult().setMemberName("member2").setSuccess(false).setMessage("failed"));
- doReturn(functionResults).when(service).executeAndGetFunctionResult(any(),
any(), any(), any());
+
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(),
any(), any());
doReturn(new String[]
{"cluster"}).when(memberValidator).findGroupsWithThisElement(any(),
any());
@@ -365,7 +366,7 @@ public class LocatorClusterManagementServiceTest {
List<RealizationResult> functionResults = new ArrayList<>();
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(new RealizationResult().setMemberName("member2"));
- doReturn(functionResults).when(service).executeAndGetFunctionResult(any(),
any(), any(), any());
+
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(),
any(), any());
doReturn(new String[]
{"cluster"}).when(memberValidator).findGroupsWithThisElement(any(),
any());
@@ -600,4 +601,19 @@ public class LocatorClusterManagementServiceTest {
assertThat(result.getStatusMessage()).isEqualTo(
"Successfully updated configuration for group1. Failed to update
configuration for group2.");
}
+
+ @Test
+ public void cleanResultsShouldCleanOutExceptionsAndNull() throws Exception {
+ List functionResults = new ArrayList<>();
+ MemberInformation memberInfo = new MemberInformation();
+ memberInfo.setId("server-1");
+ functionResults.add(memberInfo);
+ functionResults.add(new FunctionInvocationTargetException("Not
available"));
+ functionResults.add(null);
+
+ List<MemberInformation> results = service.cleanResults(functionResults);
+ assertThat(results).hasSize(1)
+ .extracting(MemberInformation::getId)
+ .containsExactly("server-1");
+ }
}