This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6369
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6369 by this 
push:
     new abad332  fixing problems in auto-reconnect
abad332 is described below

commit abad3325d5d1c121bc0a9ddf6d54ceb72e47c170
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Tue Feb 12 15:25:52 2019 -0800

    fixing problems in auto-reconnect
    
    * messages were being thrown away by the location service quorum checker
    during auto-reconnect.  some of these were "join" messages that needed
    to be delivered to the new membership service
    
    * registrants weren't being removed from the recovered membership view
    in the locator.  This confused restarting nodes because the recovered
    membership view has stale info in it that they don't want to use
    
    * locator services restart were hanging due to profile interchange being
    done under synchronization
---
 .../apache/geode/cache30/ReconnectDUnitTest.java   |   9 +-
 ...ReconnectWithClusterConfigurationDUnitTest.java | 103 +++++++++++++++------
 .../gms/membership/GMSJoinLeaveJUnitTest.java      |   2 +-
 .../gms/messenger/JGroupsMessengerJUnitTest.java   |   4 +-
 .../internal/InternalDistributedSystem.java        |   2 +-
 .../distributed/internal/InternalLocator.java      |  10 ++
 .../geode/distributed/internal/ServerLocator.java  |  10 +-
 .../membership/gms/fd/GMSHealthMonitor.java        |   2 +-
 .../membership/gms/locator/GMSLocator.java         |   9 +-
 .../membership/gms/messenger/GMSQuorumChecker.java |  10 +-
 .../membership/gms/messenger/JGroupsMessenger.java |  43 ++++++++-
 .../gms/messenger/MembershipInformation.java       |  11 ++-
 .../membership/gms/mgr/GMSMembershipManager.java   |   8 +-
 .../distributed/internal/tcpserver/TcpHandler.java |   5 +
 .../distributed/internal/tcpserver/TcpServer.java  |   4 +
 15 files changed, 186 insertions(+), 46 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
index 3eab053..8c7e602 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
@@ -1075,7 +1075,8 @@ public class ReconnectDUnitTest extends 
JUnit4CacheTestCase {
         ReconnectDUnitTest.savedCache = (GemFireCacheImpl) getCache();
         Region myRegion = createRegion("myRegion", createAtts());
         myRegion.put("MyKey", "MyValue");
-        myRegion.getAttributesMutator().addCacheListener(new 
CacheKillingListener());
+        myRegion.getAttributesMutator()
+            .addCacheListener(new CacheListenerTriggeringForcedDisconnect());
       }
     };
 
@@ -1327,10 +1328,12 @@ public class ReconnectDUnitTest extends 
JUnit4CacheTestCase {
   }
 
   /**
-   * CacheKillingListener crashes the distributed system when it is invoked 
for the first time.
+   * CacheListenerTriggeringForcedDisconnect crashes the distributed system 
when it is invoked for
+   * the first time.
    * After that it ignores any notifications.
    */
-  public static class CacheKillingListener extends CacheListenerAdapter 
implements Declarable {
+  public static class CacheListenerTriggeringForcedDisconnect extends 
CacheListenerAdapter
+      implements Declarable {
     public static int crashCount = 0;
 
     @Override
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
index e535d70..2afc077 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
@@ -25,10 +25,13 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static 
org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
 import static org.apache.geode.distributed.ConfigurationProperties.NAME;
 import static 
org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -41,8 +44,11 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.Locator;
+import 
org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
 import org.apache.geode.distributed.internal.InternalLocator;
 import 
org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
@@ -53,40 +59,62 @@ import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.DistributedRule;
 
 public class ReconnectWithClusterConfigurationDUnitTest implements 
Serializable {
-  static final int NUM_VMS = 2;
+  static final int NUM_LOCATORS = 2;
+  static final int NUM_VMS = 4;
   static DistributedSystem system;
   static Cache cache;
-  static int locatorPort;
+  static int[] locatorPorts = new int[NUM_LOCATORS];
   static Properties dsProperties;
 
   @Rule
   public DistributedRule distributedRule = 
DistributedRule.builder().withVMCount(NUM_VMS).build();
 
   @Before
-  public void setup() throws Exception {
-    locatorPort = (int) VM.getVM(0).invoke("start locator", () -> {
-      try {
-        Disconnect.disconnectFromDS();
-        dsProperties = null;
-        Properties props = getDistributedSystemProperties();
-        dsProperties.remove(LOCATORS);
-        locator = Locator.startLocatorAndDS(0, new File(""), props);
-        system = locator.getDistributedSystem();
-        cache = ((InternalLocator) locator).getCache();
-        ReconnectDUnitTest.savedSystem = locator.getDistributedSystem();
-        IgnoredException.addIgnoredException(
-            "org.apache.geode.ForcedDisconnectException||Possible loss of 
quorum");
-      } catch (IOException e) {
-        Assert.fail("unable to start locator", e);
-      }
-      return locator.getPort();
-    });
-    final int locPort = locatorPort;
-    Invoke.invokeInEveryVM("set locator port", () -> locatorPort = locPort);
+  public void setup() {
+    List<AvailablePort.Keeper> randomAvailableTCPPortKeepers =
+        AvailablePortHelper.getRandomAvailableTCPPortKeepers(NUM_LOCATORS);
+    for (int i = 0; i < NUM_LOCATORS; i++) {
+      AvailablePort.Keeper keeper = randomAvailableTCPPortKeepers.get(i);
+      locatorPorts[i] = keeper.getPort();
+    }
+    final int[] locPorts = locatorPorts;
+    Invoke.invokeInEveryVM("set locator ports", () -> locatorPorts = locPorts);
+    for (int i = 0; i < NUM_LOCATORS; i++) {
+      final int locatorNumber = i;
+      randomAvailableTCPPortKeepers.get(locatorNumber).release();
+      VM.getVM(i).invoke("start locator", () -> {
+        try {
+          Disconnect.disconnectFromDS();
+          dsProperties = null;
+          Properties props = getDistributedSystemProperties();
+          locator = Locator.startLocatorAndDS(locatorPorts[locatorNumber], new 
File(""), props);
+          system = locator.getDistributedSystem();
+          cache = ((InternalLocator) locator).getCache();
+          ReconnectDUnitTest.savedSystem = locator.getDistributedSystem();
+          IgnoredException.addIgnoredException(
+              "org.apache.geode.ForcedDisconnectException||Possible loss of 
quorum");
+        } catch (IOException e) {
+          Assert.fail("unable to start locator", e);
+        }
+      });
+    }
   }
 
   @After
   public void teardown() {
+    for (int i = 0; i < NUM_LOCATORS; i++) {
+      VM.getVM(i).invoke(() -> {
+        InternalLocator locator = InternalLocator.getLocator();
+        if (locator != null) {
+          InternalConfigurationPersistenceService sharedConfig =
+              locator.getConfigurationPersistenceService();
+          if (sharedConfig != null) {
+            sharedConfig.destroySharedConfiguration();
+          }
+          locator.stop();
+        }
+      });
+    }
     Invoke.invokeInEveryVM(() -> {
       if (system != null) {
         system.disconnect();
@@ -98,16 +126,30 @@ public class ReconnectWithClusterConfigurationDUnitTest 
implements Serializable
 
   public Properties getDistributedSystemProperties() {
     dsProperties = new Properties();
-    dsProperties.put(MAX_WAIT_TIME_RECONNECT, "10000");
+    dsProperties.put(MAX_WAIT_TIME_RECONNECT, "20000");
     dsProperties.put(ENABLE_NETWORK_PARTITION_DETECTION, "true");
     dsProperties.put(DISABLE_AUTO_RECONNECT, "false");
     dsProperties.put(ENABLE_CLUSTER_CONFIGURATION, "true");
     dsProperties.put(USE_CLUSTER_CONFIGURATION, "true");
-    dsProperties.put(LOCATORS, "localHost[" + locatorPort + "]");
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("localHost[")
+        .append(locatorPorts[0])
+        .append(']');
+    for (int i = 1; i < NUM_LOCATORS; i++) {
+      stringBuilder.append(",localHost[")
+          .append(locatorPorts[0])
+          .append(']');
+    }
+    dsProperties.put(LOCATORS, stringBuilder.toString());
     dsProperties.put(MCAST_PORT, "0");
     dsProperties.put(MEMBER_TIMEOUT, "5000");
     dsProperties.put(LOG_LEVEL, "info");
-    dsProperties.put(NAME, "vm" + VM.getCurrentVMNum());
+    int vmNumber = VM.getCurrentVMNum();
+    if (vmNumber < NUM_LOCATORS) {
+      dsProperties.put(NAME, "loc" + VM.getCurrentVMNum());
+    } else {
+      dsProperties.put(NAME, "vm" + VM.getCurrentVMNum());
+    }
     return dsProperties;
   }
 
@@ -115,7 +157,7 @@ public class ReconnectWithClusterConfigurationDUnitTest 
implements Serializable
   @Test
   public void testReconnectAfterMeltdown() throws InterruptedException {
 
-    for (int i = 1; i < NUM_VMS; i++) {
+    for (int i = NUM_LOCATORS; i < NUM_VMS; i++) {
       VM.getVM(i).invoke("create cache", () -> {
         cache = new CacheFactory(getDistributedSystemProperties()).create();
         system = cache.getDistributedSystem();
@@ -129,14 +171,21 @@ public class ReconnectWithClusterConfigurationDUnitTest 
implements Serializable
     for (AsyncInvocation crasher : crashers) {
       crasher.join();
     }
+    AsyncInvocation[] waiters = new AsyncInvocation[NUM_VMS];
     for (int i = NUM_VMS - 1; i >= 0; i--) {
-      VM.getVM(i).invoke("wait for reconnect", () -> {
+      waiters[i] = VM.getVM(i).invokeAsync("wait for reconnect", () -> {
         
system.waitUntilReconnected(GeodeAwaitility.getTimeout().getValueInMS(),
             TimeUnit.MILLISECONDS);
         system = system.getReconnectedSystem();
         cache = cache.getReconnectedCache();
+        await().untilAsserted(() -> 
assertThat(system.getAllOtherMembers().size())
+            .withFailMessage("wrong number of members: " + 
system.getAllOtherMembers())
+            .isEqualTo(NUM_VMS - 1));
       });
     }
+    for (AsyncInvocation waiter : waiters) {
+      waiter.join();
+    }
   }
 
 }
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 1fbfc6d..1aebc41 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -622,7 +622,7 @@ public class GMSJoinLeaveJUnitTest {
     previousMemberId.setVmViewId(0);
     NetView view = new NetView(mockMembers[0], 1,
         createMemberList(mockMembers[0], previousMemberId, mockMembers[1]));
-    InstallViewMessage viewMessage = new InstallViewMessage(view, 0, true);
+    InstallViewMessage viewMessage = new InstallViewMessage(view, 0, false);
     viewMessage.setSender(mockMembers[0]);
     gmsJoinLeave.processMessage(viewMessage);
     assertEquals(0, gmsJoinLeaveMemberId.getVmViewId());
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 274b558..d5239e8 100755
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.lang3.SerializationException;
 import org.jgroups.Address;
@@ -877,7 +878,8 @@ public class JGroupsMessengerJUnitTest {
     initMocks(false);
     JChannel channel = messenger.myChannel;
     services.getConfig().getTransport().setOldDSMembershipInfo(new 
MembershipInformation(channel,
-        Collections.singleton(new InternalDistributedMember("localhost", 
10000))));
+        Collections.singleton(new InternalDistributedMember("localhost", 
10000)),
+        new ConcurrentLinkedQueue<>()));
     JGroupsMessenger newMessenger = new JGroupsMessenger();
     newMessenger.init(services);
     newMessenger.start();
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 3a3debf..8baaf7f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -1262,7 +1262,7 @@ public class InternalDistributedSystem extends 
DistributedSystem
     boolean isForcedDisconnect = dm.getRootCause() instanceof 
ForcedDisconnectException;
     boolean rejoined = false;
     this.reconnected = false;
-    if (isForcedDisconnect) {
+    if (isForcedDisconnect && !this.isReconnectingDS) {
       this.forcedDisconnect = true;
       resetReconnectAttemptCounter();
       rejoined = tryReconnect(true, reason, GemFireCacheImpl.getInstance());
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 579b011..2d1d401 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -1112,6 +1112,7 @@ public class InternalLocator extends Locator implements 
ConnectListener, LogConf
       endStartLocator(this.myDs);
       logger.info("Locator restart completed");
     }
+    this.server.restartCompleted(newSystem);
   }
 
   public ClusterManagementService getClusterManagementService() {
@@ -1265,6 +1266,15 @@ public class InternalLocator extends Locator implements 
ConnectListener, LogConf
     }
 
     @Override
+    public void restartCompleted(DistributedSystem ds) {
+      if (ds != null) {
+        for (TcpHandler handler : this.allHandlers) {
+          handler.restartCompleted(ds);
+        }
+      }
+    }
+
+    @Override
     public Object processRequest(Object request) throws IOException {
       long giveup = 0;
       while (giveup == 0 || System.currentTimeMillis() < giveup) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
index f5dcd52..fbea015 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
@@ -288,10 +288,12 @@ public class ServerLocator implements TcpHandler, 
DistributionAdvisee {
       this.loadSnapshot = new LocatorLoadSnapshot();
       this.ds = (InternalDistributedSystem) ds;
       this.advisor = ControllerAdvisor.createControllerAdvisor(this); // 
escapes constructor but
-                                                                      // 
allows field to be final
-      if (ds.isConnected()) {
-        this.advisor.handshake(); // GEODE-1393: need to get server 
information during restart
-      }
+    }
+  }
+
+  public void restartCompleted(DistributedSystem ds) {
+    if (ds.isConnected()) {
+      this.advisor.handshake(); // GEODE-1393: need to get server information 
during restart
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 22e3b73..6991efb 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1062,7 +1062,7 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
     // only respond if the intended recipient is this member
     InternalDistributedMember me = localAddress;
 
-    if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
+    if (me == null || me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
       HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
       hm.setRecipient(m.getSender());
       Set<InternalDistributedMember> membersNotReceivedMsg = 
services.getMessenger().send(hm);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
index 659797a..407cff1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -178,6 +178,10 @@ public class GMSLocator implements Locator, NetLocator {
 
   @Override
   public void setIsCoordinator(boolean isCoordinator) {
+    if (isCoordinator) {
+      logger.info("Location services has received notification that this node 
is becoming"
+          + " membership coordinator");
+    }
     this.isCoordinator = isCoordinator;
   }
 
@@ -250,6 +254,9 @@ public class GMSLocator implements Locator, NetLocator {
 
     synchronized (registrants) {
       registrants.add(findRequest.getMemberID());
+      if (recoveredView != null) {
+        recoveredView.remove(findRequest.getMemberID());
+      }
     }
 
     if (v != null) {
@@ -299,9 +306,7 @@ public class GMSLocator implements Locator, NetLocator {
     synchronized (registrants) {
       if (isCoordinator) {
         coordinator = localAddress;
-
         if (v != null && localAddress != null && 
!localAddress.equals(v.getCoordinator())) {
-          logger.info("This member is becoming coordinator since view {}", v);
           v = null;
           fromView = false;
         }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
index 50b803d..14adc8d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.logging.log4j.Logger;
 import org.jgroups.Address;
@@ -55,6 +56,7 @@ public class GMSQuorumChecker implements QuorumChecker {
   private JGAddress myAddress;
   private final long partitionThreshold;
   private Set<DistributedMember> oldDistributedMemberIdentifiers;
+  private ConcurrentLinkedQueue<Message> messageQueue = new 
ConcurrentLinkedQueue<>();
 
   public GMSQuorumChecker(NetView jgView, int partitionThreshold, JChannel 
channel,
       Set<DistributedMember> oldDistributedMemberIdentifiers) {
@@ -125,7 +127,7 @@ public class GMSQuorumChecker implements QuorumChecker {
 
   @Override
   public MembershipInformation getMembershipInfo() {
-    return new MembershipInformation(channel, oldDistributedMemberIdentifiers);
+    return new MembershipInformation(channel, oldDistributedMemberIdentifiers, 
messageQueue);
   }
 
   private boolean calculateQuorum() {
@@ -219,9 +221,15 @@ public class GMSQuorumChecker implements QuorumChecker {
         }
       } else if (pingPonger.isPongMessage(msgBytes)) {
         pongReceived(msg.getSrc());
+      } else {
+        queueMessage(msg);
       }
     }
 
+    private void queueMessage(Message msg) {
+      messageQueue.add(msg);
+    }
+
     @Override
     public void getState(OutputStream output) throws Exception {}
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 78ceba2..cc0b1cc 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +86,7 @@ import 
org.apache.geode.distributed.internal.membership.NetView;
 import org.apache.geode.distributed.internal.membership.QuorumChecker;
 import org.apache.geode.distributed.internal.membership.gms.GMSMember;
 import org.apache.geode.distributed.internal.membership.gms.Services;
+import 
org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import 
org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
 import 
org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
 import 
org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -171,6 +173,19 @@ public class JGroupsMessenger implements Messenger {
    */
   private Set<DistributedMember> usedDistributedMemberIdentifiers = new 
HashSet<>();
 
+  /**
+   * During reconnect a QuorumChecker holds the JGroups channel and responds 
to Ping
+   * and Pong messages but also queues any messages it doesn't recognize. 
These need
+   * to be delivered to handlers after membership services have been rebuilt.
+   */
+  private Queue<Message> queuedMessagesFromReconnect;
+
+  /**
+   * The JGroupsReceiver is handed messages by the JGroups Channel. It is 
responsible
+   * for deserializating and dispatching those messages to the appropriate 
handler
+   */
+  private JGroupsReceiver jgroupsReceiver;
+
   @Override
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
@@ -307,6 +322,7 @@ public class JGroupsMessenger implements Messenger {
         MembershipInformation oldInfo = (MembershipInformation) 
oldDSMembershipInfo;
         myChannel = oldInfo.getChannel();
         usedDistributedMemberIdentifiers = oldInfo.getMembershipIdentifiers();
+        queuedMessagesFromReconnect = oldInfo.getQueuedMessages();
 
         // scrub the old channel
         ViewId vid = new ViewId(new JGAddress(), 0);
@@ -343,7 +359,8 @@ public class JGroupsMessenger implements Messenger {
 
     try {
       myChannel.setReceiver(null);
-      myChannel.setReceiver(new JGroupsReceiver());
+      jgroupsReceiver = new JGroupsReceiver();
+      myChannel.setReceiver(jgroupsReceiver);
       if (!reconnecting) {
         myChannel.connect("AG"); // apache g***** (whatever we end up calling 
it)
       }
@@ -385,7 +402,17 @@ public class JGroupsMessenger implements Messenger {
   }
 
   @Override
-  public void started() {}
+  public void started() {
+    if (queuedMessagesFromReconnect != null) {
+      logger.info("Delivering {} messages queued by quorum checker",
+          queuedMessagesFromReconnect.size());
+      for (Message message : queuedMessagesFromReconnect) {
+        jgroupsReceiver.receive(message, true);
+      }
+      queuedMessagesFromReconnect.clear();
+      queuedMessagesFromReconnect = null;
+    }
+  }
 
   @Override
   public void stop() {
@@ -1224,6 +1251,10 @@ public class JGroupsMessenger implements Messenger {
 
     @Override
     public void receive(Message jgmsg) {
+      receive(jgmsg, false);
+    }
+
+    private void receive(Message jgmsg, boolean fromQuorumChecker) {
       long startTime = DistributionStats.getStatTime();
       try {
         if (services.getManager().shutdownInProgress()) {
@@ -1277,7 +1308,13 @@ public class JGroupsMessenger implements Messenger {
             logger.trace("JGroupsMessenger dispatching {} from {}", msg, 
msg.getSender());
           }
           filterIncomingMessage(msg);
-          getMessageHandler(msg).processMessage(msg);
+          MessageHandler handler = getMessageHandler(msg);
+          if (fromQuorumChecker && handler instanceof HealthMonitor) {
+            // ignore suspect / heartbeat messages that happened during
+            // auto-reconnect because they very likely have old member IDs in 
them
+          } else {
+            handler.processMessage(msg);
+          }
 
           // record the scheduling of broadcast messages
           NakAckHeader2 header = (NakAckHeader2) 
jgmsg.getHeader(nackack2HeaderId);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
index adcfc43..80bc6e7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
@@ -14,9 +14,11 @@
  */
 package org.apache.geode.distributed.internal.membership.gms.messenger;
 
+import java.util.Queue;
 import java.util.Set;
 
 import org.jgroups.JChannel;
+import org.jgroups.Message;
 
 import org.apache.geode.distributed.DistributedMember;
 
@@ -27,12 +29,15 @@ import org.apache.geode.distributed.DistributedMember;
 public class MembershipInformation {
   private final JChannel channel;
   private final Set<DistributedMember> membershipIdentifiers;
+  private final Queue<Message> queuedMessages;
 
   protected MembershipInformation(JChannel channel,
-      Set<DistributedMember> oldMembershipIdentifiers) {
+      Set<DistributedMember> oldMembershipIdentifiers,
+      Queue<Message> queuedMessages) {
 
     this.channel = channel;
     this.membershipIdentifiers = oldMembershipIdentifiers;
+    this.queuedMessages = queuedMessages;
   }
 
   public JChannel getChannel() {
@@ -42,4 +47,8 @@ public class MembershipInformation {
   public Set<DistributedMember> getMembershipIdentifiers() {
     return membershipIdentifiers;
   }
+
+  public Queue<Message> getQueuedMessages() {
+    return this.queuedMessages;
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index a49ad87..c998374 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -2560,11 +2560,17 @@ public class GMSMembershipManager implements 
MembershipManager, Manager {
           shutdownCause);
     }
 
+    if (this.isReconnectingDS()) {
+      logger.info("Reconnecting system failed to connect");
+      uncleanShutdown(reason,
+          new ForcedDisconnectException("reconnecting system failed to 
connect"));
+      return;
+    }
+
     if 
(!services.getConfig().getDistributionConfig().getDisableAutoReconnect()) {
       saveCacheXmlForReconnect();
     }
 
-
     Thread reconnectThread = new LoggingThread("DisconnectThread", false, () 
-> {
       // stop server locators immediately since they may not have correct
       // information. This has caused client failures in bridge/wan
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
index 424b3e4..1d19bf5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
@@ -51,6 +51,11 @@ public interface TcpHandler {
       InternalConfigurationPersistenceService sharedConfig);
 
   /**
+   * Informs the handler that restart has completed
+   */
+  default void restartCompleted(DistributedSystem ds) {}
+
+  /**
    * Initialize the handler with the TcpServer. Called before the TcpServer 
starts accepting
    * connections.
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 7b41d00..e21697a 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -199,6 +199,10 @@ public class TcpServer {
         + System.identityHashCode(this.serverThread) + ";alive=" + 
this.serverThread.isAlive());
   }
 
+  public void restartCompleted(InternalDistributedSystem ds) {
+    this.handler.restartCompleted(ds);
+  }
+
   public void start() throws IOException {
     this.shuttingDown = false;
     startServerThread();

Reply via email to