This is an automated email from the ASF dual-hosted git repository.
jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 869cf67 GEODE-6977: Improve WAN AutoDiscovery Resilience (#3808)
869cf67 is described below
commit 869cf6765bedcb7b618b7a7702c9195ee6fbf1f8
Author: Juan José Ramos <[email protected]>
AuthorDate: Fri Jul 19 09:16:12 2019 -0300
GEODE-6977: Improve WAN AutoDiscovery Resilience (#3808)
- Added unit tests.
- Fixed minor warnings.
- Added retry logic to the LocatorMembershipListenerImpl.
- Added extra logging (WARN level) when the auto-discovery fails after
the configured retries.
- Changed the timeout when sending LocatorJoinMessage from 1 second to
the configured member-timeout.
---
.../locator/wan/LocatorMembershipListenerImpl.java | 248 ++++++++++----
.../locator/wan/LocatorMembershipListenerTest.java | 373 +++++++++++++++++++++
2 files changed, 548 insertions(+), 73 deletions(-)
diff --git
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
index f6e0d15..8d72402 100644
---
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
+++
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -14,12 +14,15 @@
*/
package org.apache.geode.cache.client.internal.locator.wan;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
import org.apache.logging.log4j.Logger;
@@ -28,6 +31,7 @@ import
org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadFactory;
/**
* An implementation of
@@ -36,25 +40,30 @@ import org.apache.geode.internal.logging.LogService;
*
*/
public class LocatorMembershipListenerImpl implements
LocatorMembershipListener {
-
- private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo =
- new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>();
-
- private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo =
- new ConcurrentHashMap<Integer, Set<String>>();
-
private static final Logger logger = LogService.getLogger();
-
- private DistributionConfig config;
-
- private TcpClient tcpClient;
+ static final int LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS = 3;
+ static final String LOCATORS_DISTRIBUTOR_THREAD_NAME =
"LocatorsDistributorThread";
+ private static final String LISTENER_FAILURE_MESSAGE =
+ "Locator Membership listener could not exchange locator information
{}:{} with {}:{} after {} retry attempts";
+ private static final String LISTENER_FINAL_FAILURE_MESSAGE =
+ "Locator Membership listener permanently failed to exchange locator
information {}:{} with {}:{} after {} retry attempts";
private int port;
+ private DistributionConfig config;
+ private final TcpClient tcpClient;
+ private final ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Set<DistributionLocatorId>>
allLocatorsInfo =
+ new ConcurrentHashMap<>();
- public LocatorMembershipListenerImpl() {
+ LocatorMembershipListenerImpl() {
this.tcpClient = new TcpClient();
}
+ LocatorMembershipListenerImpl(TcpClient tcpClient) {
+ this.tcpClient = tcpClient;
+ }
+
@Override
public void setPort(int port) {
this.port = port;
@@ -69,76 +78,52 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
* When the new locator is added to remote locator metadata, inform all
other locators in remote
* locator metadata about the new locator so that they can update their
remote locator metadata.
*
+ * @param distributedSystemId Id of the joining locator.
+ * @param locator Id of the joining locator.
+ * @param sourceLocator Id of the locator that notified this locator about
the new one.
*/
-
@Override
public void locatorJoined(final int distributedSystemId, final
DistributionLocatorId locator,
final DistributionLocatorId sourceLocator) {
- Thread distributeLocator = new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators =
getAllLocatorsInfo();
- ArrayList<DistributionLocatorId> locatorsToRemove = new
ArrayList<DistributionLocatorId>();
-
- String localLocator = config.getStartLocator();
- DistributionLocatorId localLocatorId = null;
- if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
- localLocatorId = new DistributionLocatorId(port,
config.getBindAddress());
- } else {
- localLocatorId = new DistributionLocatorId(localLocator);
- }
- locatorsToRemove.add(localLocatorId);
- locatorsToRemove.add(locator);
- locatorsToRemove.add(sourceLocator);
-
- Map<Integer, Set<DistributionLocatorId>> localCopy =
- new HashMap<Integer, Set<DistributionLocatorId>>();
- for (Map.Entry<Integer, Set<DistributionLocatorId>> entry :
remoteLocators.entrySet()) {
- Set<DistributionLocatorId> value =
- new CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue());
- localCopy.put(entry.getKey(), value);
- }
- for (Map.Entry<Integer, Set<DistributionLocatorId>> entry :
localCopy.entrySet()) {
- for (DistributionLocatorId removeLocId : locatorsToRemove) {
- if (entry.getValue().contains(removeLocId)) {
- entry.getValue().remove(removeLocId);
- }
- }
- for (DistributionLocatorId value : entry.getValue()) {
- try {
- tcpClient.requestToServer(value.getHost(),
- new LocatorJoinMessage(distributedSystemId, locator,
localLocatorId, ""), 1000,
- false);
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Locator Membership listener could not exchange locator
information {}:{} with {}:{}",
- new Object[] {locator.getHostName(), locator.getPort(),
value.getHostName(),
- value.getPort()});
- }
- }
- try {
- tcpClient.requestToServer(locator.getHost(),
- new LocatorJoinMessage(entry.getKey(), value,
localLocatorId, ""), 1000, false);
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Locator Membership listener could not exchange locator
information {}:{} with {}:{}",
- new Object[] {value.getHostName(), value.getPort(),
locator.getHostName(),
- locator.getPort()});
- }
- }
- }
- }
+ // DistributionLocatorId for local locator.
+ DistributionLocatorId localLocatorId;
+ String localLocator = config.getStartLocator();
+ if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+ localLocatorId = new DistributionLocatorId(port,
config.getBindAddress());
+ } else {
+ localLocatorId = new DistributionLocatorId(localLocator);
+ }
+
+ // Make a local copy of the current list of known locators.
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators =
getAllLocatorsInfo();
+ Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<>();
+ for (Map.Entry<Integer, Set<DistributionLocatorId>> entry :
remoteLocators.entrySet()) {
+ Set<DistributionLocatorId> value = new
CopyOnWriteHashSet<>(entry.getValue());
+ localCopy.put(entry.getKey(), value);
+ }
+
+ // Remove locators that don't need to be notified (myself, the joining one
and the one that
+ // notified myself).
+ List<DistributionLocatorId> ignoreList = Arrays.asList(locator,
localLocatorId, sourceLocator);
+ for (Map.Entry<Integer, Set<DistributionLocatorId>> entry :
localCopy.entrySet()) {
+ for (DistributionLocatorId removeLocId : ignoreList) {
+ entry.getValue().remove(removeLocId);
}
- });
- distributeLocator.setDaemon(true);
- distributeLocator.start();
+ }
+
+ // Launch Locators Distributor thread.
+ Runnable distributeLocatorsRunnable = new
DistributeLocatorsRunnable(config.getMemberTimeout(),
+ tcpClient, localLocatorId, localCopy, locator, distributedSystemId);
+ ThreadFactory threadFactory =
+ new LoggingThreadFactory(LOCATORS_DISTRIBUTOR_THREAD_NAME, true);
+ Thread distributeLocatorsThread =
threadFactory.newThread(distributeLocatorsRunnable);
+ distributeLocatorsThread.start();
}
@Override
public Object handleRequest(Object request) {
Object response = null;
+
if (request instanceof RemoteLocatorJoinRequest) {
response = updateAllLocatorInfo((RemoteLocatorJoinRequest) request);
} else if (request instanceof LocatorJoinMessage) {
@@ -148,6 +133,7 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
} else if (request instanceof RemoteLocatorRequest) {
response = getRemoteLocators((RemoteLocatorRequest) request);
}
+
return response;
}
@@ -166,6 +152,7 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo());
}
+ @SuppressWarnings("unused")
private Object getPingResponse(RemoteLocatorPingRequest request) {
return new RemoteLocatorPingResponse();
}
@@ -213,4 +200,119 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
allLocatorsInfo.clear();
allServerLocatorsInfo.clear();
}
+
+ private static class DistributeLocatorsRunnable implements Runnable {
+ private final int memberTimeout;
+ private final TcpClient tcpClient;
+ private final DistributionLocatorId localLocatorId;
+ private final Map<Integer, Set<DistributionLocatorId>> remoteLocators;
+ private final DistributionLocatorId joiningLocator;
+ private final int joiningLocatorDistributedSystemId;
+
+ DistributeLocatorsRunnable(int memberTimeout,
+ TcpClient tcpClient,
+ DistributionLocatorId localLocatorId,
+ Map<Integer, Set<DistributionLocatorId>> remoteLocators,
+ DistributionLocatorId joiningLocator,
+ int joiningLocatorDistributedSystemId) {
+
+ this.memberTimeout = memberTimeout;
+ this.tcpClient = tcpClient;
+ this.localLocatorId = localLocatorId;
+ this.remoteLocators = remoteLocators;
+ this.joiningLocator = joiningLocator;
+ this.joiningLocatorDistributedSystemId =
joiningLocatorDistributedSystemId;
+ }
+
+ void sendMessage(DistributionLocatorId targetLocator, LocatorJoinMessage
locatorJoinMessage,
+ Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages) {
+ DistributionLocatorId advertisedLocator =
locatorJoinMessage.getLocator();
+
+ try {
+ tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage,
memberTimeout,
+ false);
+ } catch (Exception exception) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(LISTENER_FAILURE_MESSAGE,
+ new Object[] {advertisedLocator.getHostName(),
advertisedLocator.getPort(),
+ targetLocator.getHostName(), targetLocator.getPort(), 1,
exception});
+ }
+
+ if (!failedMessages.containsKey(targetLocator)) {
+ failedMessages.put(targetLocator, new HashSet<>());
+ }
+
+ failedMessages.get(targetLocator).add(locatorJoinMessage);
+ }
+ }
+
+ boolean retryMessage(DistributionLocatorId targetLocator,
LocatorJoinMessage locatorJoinMessage,
+ int retryAttempt) {
+ DistributionLocatorId advertisedLocator =
locatorJoinMessage.getLocator();
+
+ try {
+ tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage,
memberTimeout,
+ false);
+
+ return true;
+ } catch (Exception exception) {
+ if (retryAttempt == LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS) {
+ logger.warn(LISTENER_FINAL_FAILURE_MESSAGE,
+ new Object[] {advertisedLocator.getHostName(),
advertisedLocator.getPort(),
+ targetLocator.getHostName(), targetLocator.getPort(),
retryAttempt, exception});
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug(LISTENER_FAILURE_MESSAGE,
+ new Object[] {advertisedLocator.getHostName(),
advertisedLocator.getPort(),
+ targetLocator.getHostName(), targetLocator.getPort(),
retryAttempt, exception});
+ }
+ }
+
+ return false;
+ }
+ }
+
+ @Override
+ public void run() {
+ Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new
HashMap<>();
+ for (Map.Entry<Integer, Set<DistributionLocatorId>> entry :
remoteLocators.entrySet()) {
+ for (DistributionLocatorId value : entry.getValue()) {
+ // Notify known remote locator about the advertised locator.
+ LocatorJoinMessage advertiseNewLocatorMessage = new
LocatorJoinMessage(
+ joiningLocatorDistributedSystemId, joiningLocator,
localLocatorId, "");
+ sendMessage(value, advertiseNewLocatorMessage, failedMessages);
+
+ // Notify the advertised locator about remote known locator.
+ LocatorJoinMessage advertiseKnownLocatorMessage =
+ new LocatorJoinMessage(entry.getKey(), value, localLocatorId,
"");
+ sendMessage(joiningLocator, advertiseKnownLocatorMessage,
failedMessages);
+ }
+ }
+
+ // Retry failed messages and remove those that succeed.
+ if (!failedMessages.isEmpty()) {
+ for (int attempt = 1; attempt <= LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS;
attempt++) {
+
+ for (Map.Entry<DistributionLocatorId, Set<LocatorJoinMessage>> entry
: failedMessages
+ .entrySet()) {
+ DistributionLocatorId targetLocator = entry.getKey();
+ Set<LocatorJoinMessage> joinMessages = entry.getValue();
+
+ for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
+ if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
+ joinMessages.remove(locatorJoinMessage);
+ } else {
+ // Sleep between retries.
+ try {
+ Thread.sleep(memberTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
new file mode 100644
index 0000000..45035b1
--- /dev/null
+++
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal.locator.wan;
+
+import static
org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListenerImpl.LOCATORS_DISTRIBUTOR_THREAD_NAME;
+import static
org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListenerImpl.LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.tcpserver.TcpClient;
+import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+
+public class LocatorMembershipListenerTest {
+ private TcpClient tcpClient;
+ private LocatorMembershipListenerImpl locatorMembershipListener;
+
+ private DistributionLocatorId buildDistributionLocatorId(int port) {
+ return new DistributionLocatorId("localhost[" + port + "]");
+ }
+
+ private List<LocatorJoinMessage> buildPermutationsForClusterId(int dsId, int
locatorsAmount) {
+ int basePort = dsId * 10000;
+ List<LocatorJoinMessage> joinMessages = new ArrayList<>();
+
+ for (int i = 1; i <= locatorsAmount; i++) {
+ DistributionLocatorId sourceLocatorId =
buildDistributionLocatorId(basePort + i);
+
+ for (int j = 1; j <= locatorsAmount; j++) {
+ DistributionLocatorId distributionLocatorId =
buildDistributionLocatorId(basePort + j);
+ LocatorJoinMessage locatorJoinMessage =
+ new LocatorJoinMessage(dsId, distributionLocatorId,
sourceLocatorId, "");
+ joinMessages.add(locatorJoinMessage);
+ }
+ }
+
+ return joinMessages;
+ }
+
+ private void verifyMessagesSentBothWays(DistributionLocatorId sourceLocator,
+ int advertisedLocatorDsId, DistributionLocatorId advertisedLocator,
+ int initialTargetLocatorDsId, DistributionLocatorId initialTargetLocator)
+ throws IOException, ClassNotFoundException {
+ verify(tcpClient).requestToServer(initialTargetLocator.getHost(),
+ new LocatorJoinMessage(advertisedLocatorDsId, advertisedLocator,
sourceLocator, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ verify(tcpClient).requestToServer(advertisedLocator.getHost(),
+ new LocatorJoinMessage(initialTargetLocatorDsId, initialTargetLocator,
sourceLocator, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ }
+
+ private void joinLocatorsDistributorThread() throws InterruptedException {
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ Optional<Thread> distributorThread = threads.stream()
+ .filter(t -> t.getName().startsWith(LOCATORS_DISTRIBUTOR_THREAD_NAME))
+ .findFirst();
+
+ if (distributorThread.isPresent()) {
+ distributorThread.get().join();
+ }
+ }
+
+ @Before
+ public void setUp() {
+ DistributionConfig distributionConfig = mock(DistributionConfig.class);
+
when(distributionConfig.getStartLocator()).thenReturn(DistributionConfig.DEFAULT_START_LOCATOR);
+ when(distributionConfig.getMemberTimeout())
+ .thenReturn(DistributionConfig.DEFAULT_MEMBER_TIMEOUT);
+
+ tcpClient = mock(TcpClient.class);
+ locatorMembershipListener = spy(new
LocatorMembershipListenerImpl(tcpClient));
+ locatorMembershipListener.setConfig(distributionConfig);
+ }
+
+ @Test
+ public void
handleRemoteLocatorPingRequestShouldReturnCorrectResponseWithoutUpdatingInternalStructures()
{
+ RemoteLocatorPingRequest remoteLocatorPingRequest = new
RemoteLocatorPingRequest();
+ Object response =
locatorMembershipListener.handleRequest(remoteLocatorPingRequest);
+
+
assertThat(response).isNotNull().isInstanceOf(RemoteLocatorPingResponse.class);
+ assertThat(locatorMembershipListener.getAllLocatorsInfo()).isEmpty();
+ assertThat(locatorMembershipListener.getAllServerLocatorsInfo()).isEmpty();
+ }
+
+ @Test
+ public void
handleRemoteLocatorRequestShouldReturnListOfKnownRemoteLocatorsForTheRequestedDsIdWithoutUpdatingInternalStructures()
{
+ RemoteLocatorRequest remoteLocatorRequest = new RemoteLocatorRequest(1,
"");
+ Set<String> cluster1Locators =
+ new HashSet<>(Arrays.asList("localhost[10101]", "localhost[10102]"));
+
when(locatorMembershipListener.getRemoteLocatorInfo(1)).thenReturn(cluster1Locators);
+
+ Object response =
locatorMembershipListener.handleRequest(remoteLocatorRequest);
+ assertThat(response).isNotNull().isInstanceOf(RemoteLocatorResponse.class);
+ assertThat(locatorMembershipListener.getAllLocatorsInfo()).isEmpty();
+ assertThat(locatorMembershipListener.getAllServerLocatorsInfo()).isEmpty();
+ assertThat(((RemoteLocatorResponse)
response).getLocators()).isEqualTo(cluster1Locators);
+ }
+
+ @Test
+ public void
handleRemoteLocatorJoinRequestShouldReturnAllKnownLocatorsAndUpdateInternalStructures()
{
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+ RemoteLocatorJoinRequest locator1Site1JoinRequest =
+ new RemoteLocatorJoinRequest(1, locator1Site1, "");
+ DistributionLocatorId locator1Site2 = buildDistributionLocatorId(20201);
+ RemoteLocatorJoinRequest locator1Site2JoinRequest =
+ new RemoteLocatorJoinRequest(2, locator1Site2, "");
+ DistributionLocatorId locator2Site2 = buildDistributionLocatorId(20202);
+ RemoteLocatorJoinRequest locator2Site2JoinRequest =
+ new RemoteLocatorJoinRequest(2, locator2Site2, "");
+
+ // First locator from site 1.
+ Object response =
locatorMembershipListener.handleRequest(locator1Site1JoinRequest);
+
assertThat(response).isNotNull().isInstanceOf(RemoteLocatorJoinResponse.class);
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators()).isNotNull().hasSize(1);
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators().get(1).contains(locator1Site1))
+ .isTrue();
+
+ // Two locators from site 2.
+ locatorMembershipListener.handleRequest(locator1Site2JoinRequest);
+ response =
locatorMembershipListener.handleRequest(locator2Site2JoinRequest);
+
assertThat(response).isNotNull().isInstanceOf(RemoteLocatorJoinResponse.class);
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators()).isNotNull().hasSize(2);
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators().get(1).size()).isEqualTo(1);
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators().get(1).contains(locator1Site1))
+ .isTrue();
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators().get(2).size()).isEqualTo(2);
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators().get(2).contains(locator1Site2))
+ .isTrue();
+ assertThat(((RemoteLocatorJoinResponse)
response).getLocators().get(2).contains(locator2Site2))
+ .isTrue();
+ }
+
+ @Test
+ public void handleLocatorJoinMessageShouldUpdateInternalStructures()
+ throws InterruptedException, ExecutionException {
+ int clusters = 4;
+ int locatorsPerCluster = 6;
+ List<LocatorJoinMessage> allJoinMessages = new ArrayList<>();
+
+ for (int i = 1; i <= clusters; i++) {
+ allJoinMessages.addAll(buildPermutationsForClusterId(i,
locatorsPerCluster));
+ }
+
+ Collection<Callable<Object>> requests = new ArrayList<>();
+ allJoinMessages.forEach(
+ (request) -> requests.add(new
HandlerCallable(locatorMembershipListener, request)));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(allJoinMessages.size());
+ List<Future<Object>> futures = executorService.invokeAll(requests);
+ for (Future future : futures) {
+ Object response = future.get();
+ assertThat(response).isNull();
+ }
+ executorService.shutdownNow();
+
+
assertThat(locatorMembershipListener.getAllLocatorsInfo().size()).isEqualTo(clusters);
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> locatorsPerClusterMap =
+ locatorMembershipListener.getAllLocatorsInfo();
+ locatorsPerClusterMap
+ .forEach((key, value) ->
assertThat(value.size()).isEqualTo(locatorsPerCluster));
+ }
+
+ @Test
+ public void locatorJoinedShouldNotifyNobodyIfThereAreNoKnownLocators()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId joiningLocator = buildDistributionLocatorId(20202);
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+ locatorMembershipListener.locatorJoined(2, joiningLocator, locator1Site1);
+ joinLocatorsDistributorThread();
+ verify(tcpClient, times(0)).requestToServer(any(InetSocketAddress.class),
+ any(LocatorJoinMessage.class), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void
locatorJoinedShouldNotifyKnownLocatorAboutTheJoiningLocatorAndJoiningLocatorAboutTheKnownOne()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId joiningLocator = buildDistributionLocatorId(20202);
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+ DistributionLocatorId locator3Site3 = buildDistributionLocatorId(30303);
+ allLocatorsInfo.put(3, new
HashSet<>(Collections.singletonList(locator3Site3)));
+
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+ locatorMembershipListener.locatorJoined(2, joiningLocator, locator1Site1);
+ joinLocatorsDistributorThread();
+ verifyMessagesSentBothWays(locator1Site1, 2, joiningLocator, 3,
locator3Site3);
+ }
+
+ @Test
+ public void
locatorJoinedShouldNotifyEveryKnownLocatorAboutTheJoiningLocatorAndJoiningLocatorAboutAllTheKnownLocators()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+ DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+ DistributionLocatorId locator1Site2 = buildDistributionLocatorId(20201);
+ DistributionLocatorId locator2Site2 = buildDistributionLocatorId(20202);
+ DistributionLocatorId locator1Site3 = buildDistributionLocatorId(30301);
+ DistributionLocatorId locator2Site3 = buildDistributionLocatorId(30302);
+ DistributionLocatorId locator3Site3 = buildDistributionLocatorId(30303);
+ allLocatorsInfo.put(1, new HashSet<>(Arrays.asList(locator1Site1,
locator3Site1)));
+ allLocatorsInfo.put(2, new HashSet<>(Arrays.asList(locator1Site2,
locator2Site2)));
+ allLocatorsInfo.put(3,
+ new HashSet<>(Arrays.asList(locator1Site3, locator2Site3,
locator3Site3)));
+
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+ locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+ joinLocatorsDistributorThread();
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 1,
locator3Site1);
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2,
locator1Site2);
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2,
locator2Site2);
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 3,
locator1Site3);
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 3,
locator2Site3);
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 3,
locator3Site3);
+ }
+
+ @Test
+ public void
locatorJoinedShouldRetryUpToTheConfiguredUpperBoundOnConnectionFailures()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+ DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+ allLocatorsInfo.put(1, new
HashSet<>(Collections.singletonList(locator3Site1)));
+
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+ when(tcpClient.requestToServer(locator3Site1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ .thenThrow(new EOFException("Mock Exception"));
+
+ locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+ joinLocatorsDistributorThread();
+
+ verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS +
1)).requestToServer(
+ locator3Site1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ verify(tcpClient).requestToServer(joiningLocator.getHost(),
+ new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ }
+
+ @Test
+ public void
locatorJoinedShouldNotRetryAgainAfterSuccessfulRetryOnConnectionFailures()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+ DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+ allLocatorsInfo.put(1, new
HashSet<>(Collections.singletonList(locator3Site1)));
+
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+ when(tcpClient.requestToServer(locator3Site1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ .thenThrow(new EOFException("Mock Exception"))
+ .thenReturn(null);
+
+ locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+ joinLocatorsDistributorThread();
+
+ verify(tcpClient, times(2)).requestToServer(locator3Site1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ verify(tcpClient).requestToServer(joiningLocator.getHost(),
+ new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ }
+
+ @Test
+ public void locatorJoinedShouldRetryOnlyFailedMessagesOnConnectionFailures()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+ DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+ DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+ DistributionLocatorId locator1Site2 = buildDistributionLocatorId(20201);
+ DistributionLocatorId locator1Site3 = buildDistributionLocatorId(30301);
+ allLocatorsInfo.put(1, new HashSet<>(Arrays.asList(locator1Site1,
locator3Site1)));
+ allLocatorsInfo.put(2, new
HashSet<>(Collections.singletonList(locator1Site2)));
+ allLocatorsInfo.put(3, new
HashSet<>(Collections.singletonList(locator1Site3)));
+
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+ // Fail on first 2 attempts and succeed on third attempt.
+ when(tcpClient.requestToServer(locator3Site1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ .thenThrow(new EOFException("Mock Exception"))
+ .thenThrow(new EOFException("Mock Exception")).thenReturn(null);
+
+ // Fail always.
+ when(tcpClient.requestToServer(joiningLocator.getHost(),
+ new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ .thenThrow(new EOFException("Mock Exception"));
+
+ locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+ joinLocatorsDistributorThread();
+
+ verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2,
locator1Site2);
+ verify(tcpClient, times(3)).requestToServer(locator3Site1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ verify(tcpClient).requestToServer(joiningLocator.getHost(),
+ new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ verify(tcpClient).requestToServer(locator1Site3.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS +
1)).requestToServer(
+ joiningLocator.getHost(),
+ new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
+ DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ }
+
+ private static class HandlerCallable implements Callable<Object> {
+ private final Object request;
+ private final LocatorMembershipListenerImpl locatorMembershipListener;
+
+ HandlerCallable(LocatorMembershipListenerImpl locatorMembershipListener,
Object request) {
+ this.request = request;
+ this.locatorMembershipListener = locatorMembershipListener;
+ }
+
+ @Override
+ public Object call() {
+ return locatorMembershipListener.handleRequest(request);
+ }
+ }
+}