This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 869cf67  GEODE-6977: Improve WAN AutoDiscovery Resilience (#3808)
869cf67 is described below

commit 869cf6765bedcb7b618b7a7702c9195ee6fbf1f8
Author: Juan José Ramos <[email protected]>
AuthorDate: Fri Jul 19 09:16:12 2019 -0300

    GEODE-6977: Improve WAN AutoDiscovery Resilience (#3808)
    
    - Added unit tests.
    - Fixed minor warnings.
    - Added retry logic to the LocatorMembershipListenerImpl.
    - Added extra logging (WARN level) when the auto-discovery fails after
      the configured retries.
    - Changed the timeout when sending LocatorJoinMessage from 1 second to
      the configured member-timeout.
---
 .../locator/wan/LocatorMembershipListenerImpl.java | 248 ++++++++++----
 .../locator/wan/LocatorMembershipListenerTest.java | 373 +++++++++++++++++++++
 2 files changed, 548 insertions(+), 73 deletions(-)

diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
index f6e0d15..8d72402 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -14,12 +14,15 @@
  */
 package org.apache.geode.cache.client.internal.locator.wan;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.logging.log4j.Logger;
 
@@ -28,6 +31,7 @@ import 
org.apache.geode.distributed.internal.tcpserver.TcpClient;
 import org.apache.geode.internal.CopyOnWriteHashSet;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadFactory;
 
 /**
  * An implementation of
@@ -36,25 +40,30 @@ import org.apache.geode.internal.logging.LogService;
  *
  */
 public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener {
-
-  private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo =
-      new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>();
-
-  private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo =
-      new ConcurrentHashMap<Integer, Set<String>>();
-
   private static final Logger logger = LogService.getLogger();
-
-  private DistributionConfig config;
-
-  private TcpClient tcpClient;
+  static final int LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS = 3;
+  static final String LOCATORS_DISTRIBUTOR_THREAD_NAME = 
"LocatorsDistributorThread";
+  private static final String LISTENER_FAILURE_MESSAGE =
+      "Locator Membership listener could not exchange locator information 
{}:{} with {}:{} after {} retry attempts";
+  private static final String LISTENER_FINAL_FAILURE_MESSAGE =
+      "Locator Membership listener permanently failed to exchange locator 
information {}:{} with {}:{} after {} retry attempts";
 
   private int port;
+  private DistributionConfig config;
+  private final TcpClient tcpClient;
+  private final ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo =
+      new ConcurrentHashMap<>();
+  private final ConcurrentMap<Integer, Set<DistributionLocatorId>> 
allLocatorsInfo =
+      new ConcurrentHashMap<>();
 
-  public LocatorMembershipListenerImpl() {
+  LocatorMembershipListenerImpl() {
     this.tcpClient = new TcpClient();
   }
 
+  LocatorMembershipListenerImpl(TcpClient tcpClient) {
+    this.tcpClient = tcpClient;
+  }
+
   @Override
   public void setPort(int port) {
     this.port = port;
@@ -69,76 +78,52 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
    * When the new locator is added to remote locator metadata, inform all 
other locators in remote
    * locator metadata about the new locator so that they can update their 
remote locator metadata.
    *
+   * @param distributedSystemId Id of the joining locator.
+   * @param locator Id of the joining locator.
+   * @param sourceLocator Id of the locator that notified this locator about 
the new one.
    */
-
   @Override
   public void locatorJoined(final int distributedSystemId, final 
DistributionLocatorId locator,
       final DistributionLocatorId sourceLocator) {
-    Thread distributeLocator = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = 
getAllLocatorsInfo();
-        ArrayList<DistributionLocatorId> locatorsToRemove = new 
ArrayList<DistributionLocatorId>();
-
-        String localLocator = config.getStartLocator();
-        DistributionLocatorId localLocatorId = null;
-        if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
-          localLocatorId = new DistributionLocatorId(port, 
config.getBindAddress());
-        } else {
-          localLocatorId = new DistributionLocatorId(localLocator);
-        }
-        locatorsToRemove.add(localLocatorId);
-        locatorsToRemove.add(locator);
-        locatorsToRemove.add(sourceLocator);
-
-        Map<Integer, Set<DistributionLocatorId>> localCopy =
-            new HashMap<Integer, Set<DistributionLocatorId>>();
-        for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
remoteLocators.entrySet()) {
-          Set<DistributionLocatorId> value =
-              new CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue());
-          localCopy.put(entry.getKey(), value);
-        }
-        for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
localCopy.entrySet()) {
-          for (DistributionLocatorId removeLocId : locatorsToRemove) {
-            if (entry.getValue().contains(removeLocId)) {
-              entry.getValue().remove(removeLocId);
-            }
-          }
-          for (DistributionLocatorId value : entry.getValue()) {
-            try {
-              tcpClient.requestToServer(value.getHost(),
-                  new LocatorJoinMessage(distributedSystemId, locator, 
localLocatorId, ""), 1000,
-                  false);
-            } catch (Exception e) {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "Locator Membership listener could not exchange locator 
information {}:{} with {}:{}",
-                    new Object[] {locator.getHostName(), locator.getPort(), 
value.getHostName(),
-                        value.getPort()});
-              }
-            }
-            try {
-              tcpClient.requestToServer(locator.getHost(),
-                  new LocatorJoinMessage(entry.getKey(), value, 
localLocatorId, ""), 1000, false);
-            } catch (Exception e) {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "Locator Membership listener could not exchange locator 
information {}:{} with {}:{}",
-                    new Object[] {value.getHostName(), value.getPort(), 
locator.getHostName(),
-                        locator.getPort()});
-              }
-            }
-          }
-        }
+    // DistributionLocatorId for local locator.
+    DistributionLocatorId localLocatorId;
+    String localLocator = config.getStartLocator();
+    if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+      localLocatorId = new DistributionLocatorId(port, 
config.getBindAddress());
+    } else {
+      localLocatorId = new DistributionLocatorId(localLocator);
+    }
+
+    // Make a local copy of the current list of known locators.
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = 
getAllLocatorsInfo();
+    Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<>();
+    for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
remoteLocators.entrySet()) {
+      Set<DistributionLocatorId> value = new 
CopyOnWriteHashSet<>(entry.getValue());
+      localCopy.put(entry.getKey(), value);
+    }
+
+    // Remove locators that don't need to be notified (myself, the joining one 
and the one that
+    // notified myself).
+    List<DistributionLocatorId> ignoreList = Arrays.asList(locator, 
localLocatorId, sourceLocator);
+    for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
localCopy.entrySet()) {
+      for (DistributionLocatorId removeLocId : ignoreList) {
+        entry.getValue().remove(removeLocId);
       }
-    });
-    distributeLocator.setDaemon(true);
-    distributeLocator.start();
+    }
+
+    // Launch Locators Distributor thread.
+    Runnable distributeLocatorsRunnable = new 
DistributeLocatorsRunnable(config.getMemberTimeout(),
+        tcpClient, localLocatorId, localCopy, locator, distributedSystemId);
+    ThreadFactory threadFactory =
+        new LoggingThreadFactory(LOCATORS_DISTRIBUTOR_THREAD_NAME, true);
+    Thread distributeLocatorsThread = 
threadFactory.newThread(distributeLocatorsRunnable);
+    distributeLocatorsThread.start();
   }
 
   @Override
   public Object handleRequest(Object request) {
     Object response = null;
+
     if (request instanceof RemoteLocatorJoinRequest) {
       response = updateAllLocatorInfo((RemoteLocatorJoinRequest) request);
     } else if (request instanceof LocatorJoinMessage) {
@@ -148,6 +133,7 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
     } else if (request instanceof RemoteLocatorRequest) {
       response = getRemoteLocators((RemoteLocatorRequest) request);
     }
+
     return response;
   }
 
@@ -166,6 +152,7 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
     return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo());
   }
 
+  @SuppressWarnings("unused")
   private Object getPingResponse(RemoteLocatorPingRequest request) {
     return new RemoteLocatorPingResponse();
   }
@@ -213,4 +200,119 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
     allLocatorsInfo.clear();
     allServerLocatorsInfo.clear();
   }
+
+  private static class DistributeLocatorsRunnable implements Runnable {
+    private final int memberTimeout;
+    private final TcpClient tcpClient;
+    private final DistributionLocatorId localLocatorId;
+    private final Map<Integer, Set<DistributionLocatorId>> remoteLocators;
+    private final DistributionLocatorId joiningLocator;
+    private final int joiningLocatorDistributedSystemId;
+
+    DistributeLocatorsRunnable(int memberTimeout,
+        TcpClient tcpClient,
+        DistributionLocatorId localLocatorId,
+        Map<Integer, Set<DistributionLocatorId>> remoteLocators,
+        DistributionLocatorId joiningLocator,
+        int joiningLocatorDistributedSystemId) {
+
+      this.memberTimeout = memberTimeout;
+      this.tcpClient = tcpClient;
+      this.localLocatorId = localLocatorId;
+      this.remoteLocators = remoteLocators;
+      this.joiningLocator = joiningLocator;
+      this.joiningLocatorDistributedSystemId = 
joiningLocatorDistributedSystemId;
+    }
+
+    void sendMessage(DistributionLocatorId targetLocator, LocatorJoinMessage 
locatorJoinMessage,
+        Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages) {
+      DistributionLocatorId advertisedLocator = 
locatorJoinMessage.getLocator();
+
+      try {
+        tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage, 
memberTimeout,
+            false);
+      } catch (Exception exception) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(LISTENER_FAILURE_MESSAGE,
+              new Object[] {advertisedLocator.getHostName(), 
advertisedLocator.getPort(),
+                  targetLocator.getHostName(), targetLocator.getPort(), 1, 
exception});
+        }
+
+        if (!failedMessages.containsKey(targetLocator)) {
+          failedMessages.put(targetLocator, new HashSet<>());
+        }
+
+        failedMessages.get(targetLocator).add(locatorJoinMessage);
+      }
+    }
+
+    boolean retryMessage(DistributionLocatorId targetLocator, 
LocatorJoinMessage locatorJoinMessage,
+        int retryAttempt) {
+      DistributionLocatorId advertisedLocator = 
locatorJoinMessage.getLocator();
+
+      try {
+        tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage, 
memberTimeout,
+            false);
+
+        return true;
+      } catch (Exception exception) {
+        if (retryAttempt == LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS) {
+          logger.warn(LISTENER_FINAL_FAILURE_MESSAGE,
+              new Object[] {advertisedLocator.getHostName(), 
advertisedLocator.getPort(),
+                  targetLocator.getHostName(), targetLocator.getPort(), 
retryAttempt, exception});
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug(LISTENER_FAILURE_MESSAGE,
+                new Object[] {advertisedLocator.getHostName(), 
advertisedLocator.getPort(),
+                    targetLocator.getHostName(), targetLocator.getPort(), 
retryAttempt, exception});
+          }
+        }
+
+        return false;
+      }
+    }
+
+    @Override
+    public void run() {
+      Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new 
HashMap<>();
+      for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
remoteLocators.entrySet()) {
+        for (DistributionLocatorId value : entry.getValue()) {
+          // Notify known remote locator about the advertised locator.
+          LocatorJoinMessage advertiseNewLocatorMessage = new 
LocatorJoinMessage(
+              joiningLocatorDistributedSystemId, joiningLocator, 
localLocatorId, "");
+          sendMessage(value, advertiseNewLocatorMessage, failedMessages);
+
+          // Notify the advertised locator about remote known locator.
+          LocatorJoinMessage advertiseKnownLocatorMessage =
+              new LocatorJoinMessage(entry.getKey(), value, localLocatorId, 
"");
+          sendMessage(joiningLocator, advertiseKnownLocatorMessage, 
failedMessages);
+        }
+      }
+
+      // Retry failed messages and remove those that succeed.
+      if (!failedMessages.isEmpty()) {
+        for (int attempt = 1; attempt <= LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS; 
attempt++) {
+
+          for (Map.Entry<DistributionLocatorId, Set<LocatorJoinMessage>> entry 
: failedMessages
+              .entrySet()) {
+            DistributionLocatorId targetLocator = entry.getKey();
+            Set<LocatorJoinMessage> joinMessages = entry.getValue();
+
+            for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
+              if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
+                joinMessages.remove(locatorJoinMessage);
+              } else {
+                // Sleep between retries.
+                try {
+                  Thread.sleep(memberTimeout);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
 
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
new file mode 100644
index 0000000..45035b1
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal.locator.wan;
+
+import static 
org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListenerImpl.LOCATORS_DISTRIBUTOR_THREAD_NAME;
+import static 
org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListenerImpl.LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.tcpserver.TcpClient;
+import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+
+public class LocatorMembershipListenerTest {
+  private TcpClient tcpClient;
+  private LocatorMembershipListenerImpl locatorMembershipListener;
+
+  private DistributionLocatorId buildDistributionLocatorId(int port) {
+    return new DistributionLocatorId("localhost[" + port + "]");
+  }
+
+  private List<LocatorJoinMessage> buildPermutationsForClusterId(int dsId, int 
locatorsAmount) {
+    int basePort = dsId * 10000;
+    List<LocatorJoinMessage> joinMessages = new ArrayList<>();
+
+    for (int i = 1; i <= locatorsAmount; i++) {
+      DistributionLocatorId sourceLocatorId = 
buildDistributionLocatorId(basePort + i);
+
+      for (int j = 1; j <= locatorsAmount; j++) {
+        DistributionLocatorId distributionLocatorId = 
buildDistributionLocatorId(basePort + j);
+        LocatorJoinMessage locatorJoinMessage =
+            new LocatorJoinMessage(dsId, distributionLocatorId, 
sourceLocatorId, "");
+        joinMessages.add(locatorJoinMessage);
+      }
+    }
+
+    return joinMessages;
+  }
+
+  private void verifyMessagesSentBothWays(DistributionLocatorId sourceLocator,
+      int advertisedLocatorDsId, DistributionLocatorId advertisedLocator,
+      int initialTargetLocatorDsId, DistributionLocatorId initialTargetLocator)
+      throws IOException, ClassNotFoundException {
+    verify(tcpClient).requestToServer(initialTargetLocator.getHost(),
+        new LocatorJoinMessage(advertisedLocatorDsId, advertisedLocator, 
sourceLocator, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    verify(tcpClient).requestToServer(advertisedLocator.getHost(),
+        new LocatorJoinMessage(initialTargetLocatorDsId, initialTargetLocator, 
sourceLocator, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+  }
+
+  private void joinLocatorsDistributorThread() throws InterruptedException {
+    Set<Thread> threads = Thread.getAllStackTraces().keySet();
+    Optional<Thread> distributorThread = threads.stream()
+        .filter(t -> t.getName().startsWith(LOCATORS_DISTRIBUTOR_THREAD_NAME))
+        .findFirst();
+
+    if (distributorThread.isPresent()) {
+      distributorThread.get().join();
+    }
+  }
+
+  @Before
+  public void setUp() {
+    DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    
when(distributionConfig.getStartLocator()).thenReturn(DistributionConfig.DEFAULT_START_LOCATOR);
+    when(distributionConfig.getMemberTimeout())
+        .thenReturn(DistributionConfig.DEFAULT_MEMBER_TIMEOUT);
+
+    tcpClient = mock(TcpClient.class);
+    locatorMembershipListener = spy(new 
LocatorMembershipListenerImpl(tcpClient));
+    locatorMembershipListener.setConfig(distributionConfig);
+  }
+
+  @Test
+  public void 
handleRemoteLocatorPingRequestShouldReturnCorrectResponseWithoutUpdatingInternalStructures()
 {
+    RemoteLocatorPingRequest remoteLocatorPingRequest = new 
RemoteLocatorPingRequest();
+    Object response = 
locatorMembershipListener.handleRequest(remoteLocatorPingRequest);
+
+    
assertThat(response).isNotNull().isInstanceOf(RemoteLocatorPingResponse.class);
+    assertThat(locatorMembershipListener.getAllLocatorsInfo()).isEmpty();
+    assertThat(locatorMembershipListener.getAllServerLocatorsInfo()).isEmpty();
+  }
+
+  @Test
+  public void 
handleRemoteLocatorRequestShouldReturnListOfKnownRemoteLocatorsForTheRequestedDsIdWithoutUpdatingInternalStructures()
 {
+    RemoteLocatorRequest remoteLocatorRequest = new RemoteLocatorRequest(1, 
"");
+    Set<String> cluster1Locators =
+        new HashSet<>(Arrays.asList("localhost[10101]", "localhost[10102]"));
+    
when(locatorMembershipListener.getRemoteLocatorInfo(1)).thenReturn(cluster1Locators);
+
+    Object response = 
locatorMembershipListener.handleRequest(remoteLocatorRequest);
+    assertThat(response).isNotNull().isInstanceOf(RemoteLocatorResponse.class);
+    assertThat(locatorMembershipListener.getAllLocatorsInfo()).isEmpty();
+    assertThat(locatorMembershipListener.getAllServerLocatorsInfo()).isEmpty();
+    assertThat(((RemoteLocatorResponse) 
response).getLocators()).isEqualTo(cluster1Locators);
+  }
+
+  @Test
+  public void 
handleRemoteLocatorJoinRequestShouldReturnAllKnownLocatorsAndUpdateInternalStructures()
 {
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    RemoteLocatorJoinRequest locator1Site1JoinRequest =
+        new RemoteLocatorJoinRequest(1, locator1Site1, "");
+    DistributionLocatorId locator1Site2 = buildDistributionLocatorId(20201);
+    RemoteLocatorJoinRequest locator1Site2JoinRequest =
+        new RemoteLocatorJoinRequest(2, locator1Site2, "");
+    DistributionLocatorId locator2Site2 = buildDistributionLocatorId(20202);
+    RemoteLocatorJoinRequest locator2Site2JoinRequest =
+        new RemoteLocatorJoinRequest(2, locator2Site2, "");
+
+    // First locator from site 1.
+    Object response = 
locatorMembershipListener.handleRequest(locator1Site1JoinRequest);
+    
assertThat(response).isNotNull().isInstanceOf(RemoteLocatorJoinResponse.class);
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators()).isNotNull().hasSize(1);
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators().get(1).contains(locator1Site1))
+        .isTrue();
+
+    // Two locators from site 2.
+    locatorMembershipListener.handleRequest(locator1Site2JoinRequest);
+    response = 
locatorMembershipListener.handleRequest(locator2Site2JoinRequest);
+    
assertThat(response).isNotNull().isInstanceOf(RemoteLocatorJoinResponse.class);
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators()).isNotNull().hasSize(2);
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators().get(1).size()).isEqualTo(1);
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators().get(1).contains(locator1Site1))
+        .isTrue();
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators().get(2).size()).isEqualTo(2);
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators().get(2).contains(locator1Site2))
+        .isTrue();
+    assertThat(((RemoteLocatorJoinResponse) 
response).getLocators().get(2).contains(locator2Site2))
+        .isTrue();
+  }
+
+  @Test
+  public void handleLocatorJoinMessageShouldUpdateInternalStructures()
+      throws InterruptedException, ExecutionException {
+    int clusters = 4;
+    int locatorsPerCluster = 6;
+    List<LocatorJoinMessage> allJoinMessages = new ArrayList<>();
+
+    for (int i = 1; i <= clusters; i++) {
+      allJoinMessages.addAll(buildPermutationsForClusterId(i, 
locatorsPerCluster));
+    }
+
+    Collection<Callable<Object>> requests = new ArrayList<>();
+    allJoinMessages.forEach(
+        (request) -> requests.add(new 
HandlerCallable(locatorMembershipListener, request)));
+
+    ExecutorService executorService = 
Executors.newFixedThreadPool(allJoinMessages.size());
+    List<Future<Object>> futures = executorService.invokeAll(requests);
+    for (Future future : futures) {
+      Object response = future.get();
+      assertThat(response).isNull();
+    }
+    executorService.shutdownNow();
+
+    
assertThat(locatorMembershipListener.getAllLocatorsInfo().size()).isEqualTo(clusters);
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> locatorsPerClusterMap =
+        locatorMembershipListener.getAllLocatorsInfo();
+    locatorsPerClusterMap
+        .forEach((key, value) -> 
assertThat(value.size()).isEqualTo(locatorsPerCluster));
+  }
+
+  @Test
+  public void locatorJoinedShouldNotifyNobodyIfThereAreNoKnownLocators()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId joiningLocator = buildDistributionLocatorId(20202);
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+    locatorMembershipListener.locatorJoined(2, joiningLocator, locator1Site1);
+    joinLocatorsDistributorThread();
+    verify(tcpClient, times(0)).requestToServer(any(InetSocketAddress.class),
+        any(LocatorJoinMessage.class), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void 
locatorJoinedShouldNotifyKnownLocatorAboutTheJoiningLocatorAndJoiningLocatorAboutTheKnownOne()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId joiningLocator = buildDistributionLocatorId(20202);
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    DistributionLocatorId locator3Site3 = buildDistributionLocatorId(30303);
+    allLocatorsInfo.put(3, new 
HashSet<>(Collections.singletonList(locator3Site3)));
+    
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+    locatorMembershipListener.locatorJoined(2, joiningLocator, locator1Site1);
+    joinLocatorsDistributorThread();
+    verifyMessagesSentBothWays(locator1Site1, 2, joiningLocator, 3, 
locator3Site3);
+  }
+
+  @Test
+  public void 
locatorJoinedShouldNotifyEveryKnownLocatorAboutTheJoiningLocatorAndJoiningLocatorAboutAllTheKnownLocators()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+    DistributionLocatorId locator1Site2 = buildDistributionLocatorId(20201);
+    DistributionLocatorId locator2Site2 = buildDistributionLocatorId(20202);
+    DistributionLocatorId locator1Site3 = buildDistributionLocatorId(30301);
+    DistributionLocatorId locator2Site3 = buildDistributionLocatorId(30302);
+    DistributionLocatorId locator3Site3 = buildDistributionLocatorId(30303);
+    allLocatorsInfo.put(1, new HashSet<>(Arrays.asList(locator1Site1, 
locator3Site1)));
+    allLocatorsInfo.put(2, new HashSet<>(Arrays.asList(locator1Site2, 
locator2Site2)));
+    allLocatorsInfo.put(3,
+        new HashSet<>(Arrays.asList(locator1Site3, locator2Site3, 
locator3Site3)));
+    
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+    locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+    joinLocatorsDistributorThread();
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 1, 
locator3Site1);
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2, 
locator1Site2);
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2, 
locator2Site2);
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 3, 
locator1Site3);
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 3, 
locator2Site3);
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 3, 
locator3Site3);
+  }
+
+  @Test
+  public void 
locatorJoinedShouldRetryUpToTheConfiguredUpperBoundOnConnectionFailures()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+    allLocatorsInfo.put(1, new 
HashSet<>(Collections.singletonList(locator3Site1)));
+    
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+    when(tcpClient.requestToServer(locator3Site1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+            .thenThrow(new EOFException("Mock Exception"));
+
+    locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+    joinLocatorsDistributorThread();
+
+    verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 
1)).requestToServer(
+        locator3Site1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    verify(tcpClient).requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+  }
+
+  @Test
+  public void 
locatorJoinedShouldNotRetryAgainAfterSuccessfulRetryOnConnectionFailures()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+    allLocatorsInfo.put(1, new 
HashSet<>(Collections.singletonList(locator3Site1)));
+    
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+    when(tcpClient.requestToServer(locator3Site1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+            .thenThrow(new EOFException("Mock Exception"))
+            .thenReturn(null);
+
+    locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+    joinLocatorsDistributorThread();
+
+    verify(tcpClient, times(2)).requestToServer(locator3Site1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    verify(tcpClient).requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+  }
+
+  @Test
+  public void locatorJoinedShouldRetryOnlyFailedMessagesOnConnectionFailures()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
+    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
+    DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
+    DistributionLocatorId locator1Site2 = buildDistributionLocatorId(20201);
+    DistributionLocatorId locator1Site3 = buildDistributionLocatorId(30301);
+    allLocatorsInfo.put(1, new HashSet<>(Arrays.asList(locator1Site1, 
locator3Site1)));
+    allLocatorsInfo.put(2, new 
HashSet<>(Collections.singletonList(locator1Site2)));
+    allLocatorsInfo.put(3, new 
HashSet<>(Collections.singletonList(locator1Site3)));
+    
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
+
+    // Fail on first 2 attempts and succeed on third attempt.
+    when(tcpClient.requestToServer(locator3Site1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+            .thenThrow(new EOFException("Mock Exception"))
+            .thenThrow(new EOFException("Mock Exception")).thenReturn(null);
+
+    // Fail always.
+    when(tcpClient.requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+            .thenThrow(new EOFException("Mock Exception"));
+
+    locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+    joinLocatorsDistributorThread();
+
+    verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2, 
locator1Site2);
+    verify(tcpClient, times(3)).requestToServer(locator3Site1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    verify(tcpClient).requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    verify(tcpClient).requestToServer(locator1Site3.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 
1)).requestToServer(
+        joiningLocator.getHost(),
+        new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
+        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+  }
+
+  private static class HandlerCallable implements Callable<Object> {
+    private final Object request;
+    private final LocatorMembershipListenerImpl locatorMembershipListener;
+
+    HandlerCallable(LocatorMembershipListenerImpl locatorMembershipListener, 
Object request) {
+      this.request = request;
+      this.locatorMembershipListener = locatorMembershipListener;
+    }
+
+    @Override
+    public Object call() {
+      return locatorMembershipListener.handleRequest(request);
+    }
+  }
+}

Reply via email to