This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-6369 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6369 by this push: new abad332 fixing problems in auto-reconnect abad332 is described below commit abad3325d5d1c121bc0a9ddf6d54ceb72e47c170 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Tue Feb 12 15:25:52 2019 -0800 fixing problems in auto-reconnect * messages were being thrown away by the location service quorum checker during auto-reconnect. some of these were "join" messages that needed to be delivered to the new membership service * registrants weren't being removed from the recovered membership view in the locator. This confused restarting nodes because the recovered membership view has stale info in it that they don't want to use * locator services restart were hanging due to profile interchange being done under synchronization --- .../apache/geode/cache30/ReconnectDUnitTest.java | 9 +- ...ReconnectWithClusterConfigurationDUnitTest.java | 103 +++++++++++++++------ .../gms/membership/GMSJoinLeaveJUnitTest.java | 2 +- .../gms/messenger/JGroupsMessengerJUnitTest.java | 4 +- .../internal/InternalDistributedSystem.java | 2 +- .../distributed/internal/InternalLocator.java | 10 ++ .../geode/distributed/internal/ServerLocator.java | 10 +- .../membership/gms/fd/GMSHealthMonitor.java | 2 +- .../membership/gms/locator/GMSLocator.java | 9 +- .../membership/gms/messenger/GMSQuorumChecker.java | 10 +- .../membership/gms/messenger/JGroupsMessenger.java | 43 ++++++++- .../gms/messenger/MembershipInformation.java | 11 ++- .../membership/gms/mgr/GMSMembershipManager.java | 8 +- .../distributed/internal/tcpserver/TcpHandler.java | 5 + .../distributed/internal/tcpserver/TcpServer.java | 4 + 15 files changed, 186 insertions(+), 46 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java index 3eab053..8c7e602 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java @@ -1075,7 +1075,8 @@ public class ReconnectDUnitTest extends JUnit4CacheTestCase { ReconnectDUnitTest.savedCache = (GemFireCacheImpl) getCache(); Region myRegion = createRegion("myRegion", createAtts()); myRegion.put("MyKey", "MyValue"); - myRegion.getAttributesMutator().addCacheListener(new CacheKillingListener()); + myRegion.getAttributesMutator() + .addCacheListener(new CacheListenerTriggeringForcedDisconnect()); } }; @@ -1327,10 +1328,12 @@ public class ReconnectDUnitTest extends JUnit4CacheTestCase { } /** - * CacheKillingListener crashes the distributed system when it is invoked for the first time. + * CacheListenerTriggeringForcedDisconnect crashes the distributed system when it is invoked for + * the first time. * After that it ignores any notifications. */ - public static class CacheKillingListener extends CacheListenerAdapter implements Declarable { + public static class CacheListenerTriggeringForcedDisconnect extends CacheListenerAdapter + implements Declarable { public static int crashCount = 0; @Override diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java index e535d70..2afc077 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java @@ -25,10 +25,13 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; import static org.apache.geode.distributed.ConfigurationProperties.NAME; import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -41,8 +44,11 @@ import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper; +import org.apache.geode.internal.AvailablePort; +import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.AsyncInvocation; @@ -53,40 +59,62 @@ import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.DistributedRule; public class ReconnectWithClusterConfigurationDUnitTest implements Serializable { - static final int NUM_VMS = 2; + static final int NUM_LOCATORS = 2; + static final int NUM_VMS = 4; static DistributedSystem system; static Cache cache; - static int locatorPort; + static int[] locatorPorts = new int[NUM_LOCATORS]; static Properties dsProperties; @Rule public DistributedRule distributedRule = DistributedRule.builder().withVMCount(NUM_VMS).build(); @Before - public void setup() throws Exception { - locatorPort = (int) VM.getVM(0).invoke("start locator", () -> { - try { - Disconnect.disconnectFromDS(); - dsProperties = null; - Properties props = getDistributedSystemProperties(); - dsProperties.remove(LOCATORS); - locator = Locator.startLocatorAndDS(0, new File(""), props); - system = locator.getDistributedSystem(); - cache = ((InternalLocator) locator).getCache(); - ReconnectDUnitTest.savedSystem = locator.getDistributedSystem(); - IgnoredException.addIgnoredException( - "org.apache.geode.ForcedDisconnectException||Possible loss of quorum"); - } catch (IOException e) { - Assert.fail("unable to start locator", e); - } - return locator.getPort(); - }); - final int locPort = locatorPort; - Invoke.invokeInEveryVM("set locator port", () -> locatorPort = locPort); + public void setup() { + List<AvailablePort.Keeper> randomAvailableTCPPortKeepers = + AvailablePortHelper.getRandomAvailableTCPPortKeepers(NUM_LOCATORS); + for (int i = 0; i < NUM_LOCATORS; i++) { + AvailablePort.Keeper keeper = randomAvailableTCPPortKeepers.get(i); + locatorPorts[i] = keeper.getPort(); + } + final int[] locPorts = locatorPorts; + Invoke.invokeInEveryVM("set locator ports", () -> locatorPorts = locPorts); + for (int i = 0; i < NUM_LOCATORS; i++) { + final int locatorNumber = i; + randomAvailableTCPPortKeepers.get(locatorNumber).release(); + VM.getVM(i).invoke("start locator", () -> { + try { + Disconnect.disconnectFromDS(); + dsProperties = null; + Properties props = getDistributedSystemProperties(); + locator = Locator.startLocatorAndDS(locatorPorts[locatorNumber], new File(""), props); + system = locator.getDistributedSystem(); + cache = ((InternalLocator) locator).getCache(); + ReconnectDUnitTest.savedSystem = locator.getDistributedSystem(); + IgnoredException.addIgnoredException( + "org.apache.geode.ForcedDisconnectException||Possible loss of quorum"); + } catch (IOException e) { + Assert.fail("unable to start locator", e); + } + }); + } } @After public void teardown() { + for (int i = 0; i < NUM_LOCATORS; i++) { + VM.getVM(i).invoke(() -> { + InternalLocator locator = InternalLocator.getLocator(); + if (locator != null) { + InternalConfigurationPersistenceService sharedConfig = + locator.getConfigurationPersistenceService(); + if (sharedConfig != null) { + sharedConfig.destroySharedConfiguration(); + } + locator.stop(); + } + }); + } Invoke.invokeInEveryVM(() -> { if (system != null) { system.disconnect(); @@ -98,16 +126,30 @@ public class ReconnectWithClusterConfigurationDUnitTest implements Serializable public Properties getDistributedSystemProperties() { dsProperties = new Properties(); - dsProperties.put(MAX_WAIT_TIME_RECONNECT, "10000"); + dsProperties.put(MAX_WAIT_TIME_RECONNECT, "20000"); dsProperties.put(ENABLE_NETWORK_PARTITION_DETECTION, "true"); dsProperties.put(DISABLE_AUTO_RECONNECT, "false"); dsProperties.put(ENABLE_CLUSTER_CONFIGURATION, "true"); dsProperties.put(USE_CLUSTER_CONFIGURATION, "true"); - dsProperties.put(LOCATORS, "localHost[" + locatorPort + "]"); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("localHost[") + .append(locatorPorts[0]) + .append(']'); + for (int i = 1; i < NUM_LOCATORS; i++) { + stringBuilder.append(",localHost[") + .append(locatorPorts[0]) + .append(']'); + } + dsProperties.put(LOCATORS, stringBuilder.toString()); dsProperties.put(MCAST_PORT, "0"); dsProperties.put(MEMBER_TIMEOUT, "5000"); dsProperties.put(LOG_LEVEL, "info"); - dsProperties.put(NAME, "vm" + VM.getCurrentVMNum()); + int vmNumber = VM.getCurrentVMNum(); + if (vmNumber < NUM_LOCATORS) { + dsProperties.put(NAME, "loc" + VM.getCurrentVMNum()); + } else { + dsProperties.put(NAME, "vm" + VM.getCurrentVMNum()); + } return dsProperties; } @@ -115,7 +157,7 @@ public class ReconnectWithClusterConfigurationDUnitTest implements Serializable @Test public void testReconnectAfterMeltdown() throws InterruptedException { - for (int i = 1; i < NUM_VMS; i++) { + for (int i = NUM_LOCATORS; i < NUM_VMS; i++) { VM.getVM(i).invoke("create cache", () -> { cache = new CacheFactory(getDistributedSystemProperties()).create(); system = cache.getDistributedSystem(); @@ -129,14 +171,21 @@ public class ReconnectWithClusterConfigurationDUnitTest implements Serializable for (AsyncInvocation crasher : crashers) { crasher.join(); } + AsyncInvocation[] waiters = new AsyncInvocation[NUM_VMS]; for (int i = NUM_VMS - 1; i >= 0; i--) { - VM.getVM(i).invoke("wait for reconnect", () -> { + waiters[i] = VM.getVM(i).invokeAsync("wait for reconnect", () -> { system.waitUntilReconnected(GeodeAwaitility.getTimeout().getValueInMS(), TimeUnit.MILLISECONDS); system = system.getReconnectedSystem(); cache = cache.getReconnectedCache(); + await().untilAsserted(() -> assertThat(system.getAllOtherMembers().size()) + .withFailMessage("wrong number of members: " + system.getAllOtherMembers()) + .isEqualTo(NUM_VMS - 1)); }); } + for (AsyncInvocation waiter : waiters) { + waiter.join(); + } } } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java index 1fbfc6d..1aebc41 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java @@ -622,7 +622,7 @@ public class GMSJoinLeaveJUnitTest { previousMemberId.setVmViewId(0); NetView view = new NetView(mockMembers[0], 1, createMemberList(mockMembers[0], previousMemberId, mockMembers[1])); - InstallViewMessage viewMessage = new InstallViewMessage(view, 0, true); + InstallViewMessage viewMessage = new InstallViewMessage(view, 0, false); viewMessage.setSender(mockMembers[0]); gmsJoinLeave.processMessage(viewMessage); assertEquals(0, gmsJoinLeaveMemberId.getVmViewId()); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java index 274b558..d5239e8 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.lang3.SerializationException; import org.jgroups.Address; @@ -877,7 +878,8 @@ public class JGroupsMessengerJUnitTest { initMocks(false); JChannel channel = messenger.myChannel; services.getConfig().getTransport().setOldDSMembershipInfo(new MembershipInformation(channel, - Collections.singleton(new InternalDistributedMember("localhost", 10000)))); + Collections.singleton(new InternalDistributedMember("localhost", 10000)), + new ConcurrentLinkedQueue<>())); JGroupsMessenger newMessenger = new JGroupsMessenger(); newMessenger.init(services); newMessenger.start(); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index 3a3debf..8baaf7f 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -1262,7 +1262,7 @@ public class InternalDistributedSystem extends DistributedSystem boolean isForcedDisconnect = dm.getRootCause() instanceof ForcedDisconnectException; boolean rejoined = false; this.reconnected = false; - if (isForcedDisconnect) { + if (isForcedDisconnect && !this.isReconnectingDS) { this.forcedDisconnect = true; resetReconnectAttemptCounter(); rejoined = tryReconnect(true, reason, GemFireCacheImpl.getInstance()); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java index 579b011..2d1d401 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java @@ -1112,6 +1112,7 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf endStartLocator(this.myDs); logger.info("Locator restart completed"); } + this.server.restartCompleted(newSystem); } public ClusterManagementService getClusterManagementService() { @@ -1265,6 +1266,15 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf } @Override + public void restartCompleted(DistributedSystem ds) { + if (ds != null) { + for (TcpHandler handler : this.allHandlers) { + handler.restartCompleted(ds); + } + } + } + + @Override public Object processRequest(Object request) throws IOException { long giveup = 0; while (giveup == 0 || System.currentTimeMillis() < giveup) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java index f5dcd52..fbea015 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java @@ -288,10 +288,12 @@ public class ServerLocator implements TcpHandler, DistributionAdvisee { this.loadSnapshot = new LocatorLoadSnapshot(); this.ds = (InternalDistributedSystem) ds; this.advisor = ControllerAdvisor.createControllerAdvisor(this); // escapes constructor but - // allows field to be final - if (ds.isConnected()) { - this.advisor.handshake(); // GEODE-1393: need to get server information during restart - } + } + } + + public void restartCompleted(DistributedSystem ds) { + if (ds.isConnected()) { + this.advisor.handshake(); // GEODE-1393: need to get server information during restart } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 22e3b73..6991efb 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -1062,7 +1062,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // only respond if the intended recipient is this member InternalDistributedMember me = localAddress; - if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) { + if (me == null || me.getVmViewId() >= 0 && m.getTarget().equals(me)) { HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId()); hm.setRecipient(m.getSender()); Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java index 659797a..407cff1 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java @@ -178,6 +178,10 @@ public class GMSLocator implements Locator, NetLocator { @Override public void setIsCoordinator(boolean isCoordinator) { + if (isCoordinator) { + logger.info("Location services has received notification that this node is becoming" + + " membership coordinator"); + } this.isCoordinator = isCoordinator; } @@ -250,6 +254,9 @@ public class GMSLocator implements Locator, NetLocator { synchronized (registrants) { registrants.add(findRequest.getMemberID()); + if (recoveredView != null) { + recoveredView.remove(findRequest.getMemberID()); + } } if (v != null) { @@ -299,9 +306,7 @@ public class GMSLocator implements Locator, NetLocator { synchronized (registrants) { if (isCoordinator) { coordinator = localAddress; - if (v != null && localAddress != null && !localAddress.equals(v.getCoordinator())) { - logger.info("This member is becoming coordinator since view {}", v); v = null; fromView = false; } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java index 50b803d..14adc8d 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.logging.log4j.Logger; import org.jgroups.Address; @@ -55,6 +56,7 @@ public class GMSQuorumChecker implements QuorumChecker { private JGAddress myAddress; private final long partitionThreshold; private Set<DistributedMember> oldDistributedMemberIdentifiers; + private ConcurrentLinkedQueue<Message> messageQueue = new ConcurrentLinkedQueue<>(); public GMSQuorumChecker(NetView jgView, int partitionThreshold, JChannel channel, Set<DistributedMember> oldDistributedMemberIdentifiers) { @@ -125,7 +127,7 @@ public class GMSQuorumChecker implements QuorumChecker { @Override public MembershipInformation getMembershipInfo() { - return new MembershipInformation(channel, oldDistributedMemberIdentifiers); + return new MembershipInformation(channel, oldDistributedMemberIdentifiers, messageQueue); } private boolean calculateQuorum() { @@ -219,9 +221,15 @@ public class GMSQuorumChecker implements QuorumChecker { } } else if (pingPonger.isPongMessage(msgBytes)) { pongReceived(msg.getSrc()); + } else { + queueMessage(msg); } } + private void queueMessage(Message msg) { + messageQueue.add(msg); + } + @Override public void getState(OutputStream output) throws Exception {} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 78ceba2..cc0b1cc 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -85,6 +86,7 @@ import org.apache.geode.distributed.internal.membership.NetView; import org.apache.geode.distributed.internal.membership.QuorumChecker; import org.apache.geode.distributed.internal.membership.gms.GMSMember; import org.apache.geode.distributed.internal.membership.gms.Services; +import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler; import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger; import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest; @@ -171,6 +173,19 @@ public class JGroupsMessenger implements Messenger { */ private Set<DistributedMember> usedDistributedMemberIdentifiers = new HashSet<>(); + /** + * During reconnect a QuorumChecker holds the JGroups channel and responds to Ping + * and Pong messages but also queues any messages it doesn't recognize. These need + * to be delivered to handlers after membership services have been rebuilt. + */ + private Queue<Message> queuedMessagesFromReconnect; + + /** + * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible + * for deserializating and dispatching those messages to the appropriate handler + */ + private JGroupsReceiver jgroupsReceiver; + @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings( value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") @@ -307,6 +322,7 @@ public class JGroupsMessenger implements Messenger { MembershipInformation oldInfo = (MembershipInformation) oldDSMembershipInfo; myChannel = oldInfo.getChannel(); usedDistributedMemberIdentifiers = oldInfo.getMembershipIdentifiers(); + queuedMessagesFromReconnect = oldInfo.getQueuedMessages(); // scrub the old channel ViewId vid = new ViewId(new JGAddress(), 0); @@ -343,7 +359,8 @@ public class JGroupsMessenger implements Messenger { try { myChannel.setReceiver(null); - myChannel.setReceiver(new JGroupsReceiver()); + jgroupsReceiver = new JGroupsReceiver(); + myChannel.setReceiver(jgroupsReceiver); if (!reconnecting) { myChannel.connect("AG"); // apache g***** (whatever we end up calling it) } @@ -385,7 +402,17 @@ public class JGroupsMessenger implements Messenger { } @Override - public void started() {} + public void started() { + if (queuedMessagesFromReconnect != null) { + logger.info("Delivering {} messages queued by quorum checker", + queuedMessagesFromReconnect.size()); + for (Message message : queuedMessagesFromReconnect) { + jgroupsReceiver.receive(message, true); + } + queuedMessagesFromReconnect.clear(); + queuedMessagesFromReconnect = null; + } + } @Override public void stop() { @@ -1224,6 +1251,10 @@ public class JGroupsMessenger implements Messenger { @Override public void receive(Message jgmsg) { + receive(jgmsg, false); + } + + private void receive(Message jgmsg, boolean fromQuorumChecker) { long startTime = DistributionStats.getStatTime(); try { if (services.getManager().shutdownInProgress()) { @@ -1277,7 +1308,13 @@ public class JGroupsMessenger implements Messenger { logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender()); } filterIncomingMessage(msg); - getMessageHandler(msg).processMessage(msg); + MessageHandler handler = getMessageHandler(msg); + if (fromQuorumChecker && handler instanceof HealthMonitor) { + // ignore suspect / heartbeat messages that happened during + // auto-reconnect because they very likely have old member IDs in them + } else { + handler.processMessage(msg); + } // record the scheduling of broadcast messages NakAckHeader2 header = (NakAckHeader2) jgmsg.getHeader(nackack2HeaderId); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java index adcfc43..80bc6e7 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java @@ -14,9 +14,11 @@ */ package org.apache.geode.distributed.internal.membership.gms.messenger; +import java.util.Queue; import java.util.Set; import org.jgroups.JChannel; +import org.jgroups.Message; import org.apache.geode.distributed.DistributedMember; @@ -27,12 +29,15 @@ import org.apache.geode.distributed.DistributedMember; public class MembershipInformation { private final JChannel channel; private final Set<DistributedMember> membershipIdentifiers; + private final Queue<Message> queuedMessages; protected MembershipInformation(JChannel channel, - Set<DistributedMember> oldMembershipIdentifiers) { + Set<DistributedMember> oldMembershipIdentifiers, + Queue<Message> queuedMessages) { this.channel = channel; this.membershipIdentifiers = oldMembershipIdentifiers; + this.queuedMessages = queuedMessages; } public JChannel getChannel() { @@ -42,4 +47,8 @@ public class MembershipInformation { public Set<DistributedMember> getMembershipIdentifiers() { return membershipIdentifiers; } + + public Queue<Message> getQueuedMessages() { + return this.queuedMessages; + } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index a49ad87..c998374 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -2560,11 +2560,17 @@ public class GMSMembershipManager implements MembershipManager, Manager { shutdownCause); } + if (this.isReconnectingDS()) { + logger.info("Reconnecting system failed to connect"); + uncleanShutdown(reason, + new ForcedDisconnectException("reconnecting system failed to connect")); + return; + } + if (!services.getConfig().getDistributionConfig().getDisableAutoReconnect()) { saveCacheXmlForReconnect(); } - Thread reconnectThread = new LoggingThread("DisconnectThread", false, () -> { // stop server locators immediately since they may not have correct // information. This has caused client failures in bridge/wan diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java index 424b3e4..1d19bf5 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java @@ -51,6 +51,11 @@ public interface TcpHandler { InternalConfigurationPersistenceService sharedConfig); /** + * Informs the handler that restart has completed + */ + default void restartCompleted(DistributedSystem ds) {} + + /** * Initialize the handler with the TcpServer. Called before the TcpServer starts accepting * connections. */ diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 7b41d00..e21697a 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -199,6 +199,10 @@ public class TcpServer { + System.identityHashCode(this.serverThread) + ";alive=" + this.serverThread.isAlive()); } + public void restartCompleted(InternalDistributedSystem ds) { + this.handler.restartCompleted(ds); + } + public void start() throws IOException { this.shuttingDown = false; startServerThread();