This is an automated email from the ASF dual-hosted git repository.
sai_boorlagadda pushed a commit to branch release/1.9.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.9.0 by this push:
new fca3d14 GEODE-6369 Cache-creation failure after a successful
auto-reconnect causes subsequent NPE
fca3d14 is described below
commit fca3d14fd69c34ae54524dda4807a09d533b82f1
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Tue Feb 19 12:58:57 2019 -0800
GEODE-6369 Cache-creation failure after a successful auto-reconnect causes
subsequent NPE
If an error occurs while rebuilding the cache on auto-reconnect & we can't
continue we should throw an exception to any thread waiting for the
reconnect to complete.
If we're unable to contact the cluster configuration service we do not
terminate auto-reconnect attempts.
New members are now only allowed to join after view preparation has
completed. This will reduce the number of "surprise members" and also
ensures that any old member IDs have been removed from the view.
We now only attempt to use findCoordinatorFromView multiple times if the
view actually changes. Instead we contact locators again to see if
there are new registrants.
fixing the above exposed other problems in auto-reconnect:
* messages were being thrown away by the location service quorum checker
during auto-reconnect. some of these were "join" messages that needed
to be delivered to the new membership service
* registrants weren't being removed from the recovered membership view
in the locator. This confused restarting nodes because the recovered
membership view has stale info in it that they don't want to use
* locator services restart were hanging due to profile interchange being
done under synchronization
---
.../geode/ClusterCommunicationsDUnitTest.java | 3 +-
.../apache/geode/cache30/ReconnectDUnitTest.java | 39 +++--
...ReconnectWithClusterConfigurationDUnitTest.java | 193 +++++++++++++++++++++
.../internal/membership/MembershipJUnitTest.java | 15 --
.../gms/membership/GMSJoinLeaveJUnitTest.java | 33 +---
.../gms/messenger/JGroupsMessengerJUnitTest.java | 4 +-
.../main/java/org/apache/geode/cache/Cache.java | 2 +
.../geode/distributed/DistributedSystem.java | 2 +
.../internal/InternalDistributedSystem.java | 95 ++++++----
.../distributed/internal/InternalLocator.java | 32 +++-
.../geode/distributed/internal/ServerLocator.java | 10 +-
.../internal/membership/MembershipManager.java | 6 +
.../internal/membership/gms/GMSUtil.java | 21 ---
.../internal/membership/gms/Services.java | 8 +
.../membership/gms/fd/GMSHealthMonitor.java | 8 +-
.../membership/gms/interfaces/Manager.java | 6 +
.../membership/gms/interfaces/Service.java | 1 +
.../membership/gms/locator/GMSLocator.java | 9 +-
.../membership/gms/membership/GMSJoinLeave.java | 102 ++++++-----
.../membership/gms/messenger/GMSQuorumChecker.java | 10 +-
.../membership/gms/messenger/JGroupsMessenger.java | 45 ++++-
.../gms/messenger/MembershipInformation.java | 11 +-
.../membership/gms/mgr/GMSMembershipManager.java | 27 ++-
.../distributed/internal/tcpserver/TcpHandler.java | 5 +
.../distributed/internal/tcpserver/TcpServer.java | 4 +
.../geode/internal/cache/CacheServerLauncher.java | 7 +-
.../geode/internal/cache/GemFireCacheImpl.java | 14 +-
.../org/apache/geode/internal/tcp/Connection.java | 22 ++-
28 files changed, 542 insertions(+), 192 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index b4d106e..c970f77 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -30,6 +30,7 @@ import static
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTOR
import static
org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
import static
org.apache.geode.internal.DataSerializableFixedID.SERIAL_ACKED_MESSAGE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.DataInput;
@@ -216,7 +217,7 @@ public class ClusterCommunicationsDUnitTest implements
java.io.Serializable {
// System.setProperty("javax.net.debug", "all");
Properties props = getDistributedSystemProperties();
// locator must restart with the same port so that it reconnects to the
server
- await().atMost(15, TimeUnit.SECONDS)
+ await().atMost(getTimeout().getValueInMS(), TimeUnit.MILLISECONDS)
.until(() -> Locator.startLocatorAndDS(locatorPort, new File(""),
props) != null);
assertThat(Locator.getLocator().getDistributedSystem().getAllOtherMembers().size())
.isGreaterThan(0);
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
index 9a7d32f..232d729 100755
---
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectDUnitTest.java
@@ -15,6 +15,8 @@
package org.apache.geode.cache30;
import static java.lang.System.out;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.cache.DataPolicy.REPLICATE;
import static org.apache.geode.cache.LossAction.RECONNECT;
@@ -37,9 +39,11 @@ import static
org.apache.geode.distributed.Locator.getLocator;
import static
org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper.getMembershipManager;
import static
org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator.generate;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.ThreadUtils.join;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -63,6 +67,7 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
@@ -80,6 +85,7 @@ import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.InternalDistributedSystem.ReconnectListener;
@@ -401,7 +407,7 @@ public class ReconnectDUnitTest extends JUnit4CacheTestCase
{
System.out.println("ds.isReconnecting() = " + ds.isReconnecting());
boolean failure = true;
try {
- ds.waitUntilReconnected(60, SECONDS);
+ ds.waitUntilReconnected(getTimeout().getValueInMS(),
MILLISECONDS);
savedSystem = ds.getReconnectedSystem();
locator = (InternalLocator) getLocator();
assertTrue("Expected system to be restarted",
ds.getReconnectedSystem() != null);
@@ -480,15 +486,14 @@ public class ReconnectDUnitTest extends
JUnit4CacheTestCase {
/** this will throw an exception if location services aren't running */
private void ensureLocationServiceRunning(VM vm) {
- vm.invoke(new SerializableRunnable("ensureLocationServiceRunning") {
- @Override
- public void run() {
+ vm.invoke("ensureLocationServiceRunning", () -> {
+ await().untilAsserted(() -> {
InternalLocator intloc = (InternalLocator) locator;
ServerLocator serverLocator = intloc.getServerLocatorAdvisee();
// the initialization flag in the locator's ControllerAdvisor will
// be set if a handshake has been performed
assertTrue(serverLocator.getDistributionAdvisor().isInitialized());
- }
+ });
});
}
@@ -511,7 +516,7 @@ public class ReconnectDUnitTest extends JUnit4CacheTestCase
{
return "waiting for ds to begin reconnecting";
}
});
- long waitTime = 120;
+ long waitTime = 600;
System.out.println("VM" + VM.getCurrentVMNum() + " waiting up to "
+ waitTime + " seconds for reconnect to complete");
try {
@@ -1070,7 +1075,8 @@ public class ReconnectDUnitTest extends
JUnit4CacheTestCase {
ReconnectDUnitTest.savedCache = (GemFireCacheImpl) getCache();
Region myRegion = createRegion("myRegion", createAtts());
myRegion.put("MyKey", "MyValue");
- myRegion.getAttributesMutator().addCacheListener(new
CacheKillingListener());
+ myRegion.getAttributesMutator()
+ .addCacheListener(new CacheListenerTriggeringForcedDisconnect());
}
};
@@ -1098,7 +1104,7 @@ public class ReconnectDUnitTest extends
JUnit4CacheTestCase {
});
out.println("entering reconnect wait for " + cache);
try {
- cache.waitUntilReconnected(20, SECONDS);
+ cache.waitUntilReconnected(5, MINUTES);
} catch (InterruptedException e) {
fail("interrupted");
}
@@ -1155,11 +1161,10 @@ public class ReconnectDUnitTest extends
JUnit4CacheTestCase {
return "waiting for cache to begin reconnecting";
}
});
- try {
- cache.waitUntilReconnected(20, SECONDS);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
+ assertThatThrownBy(() ->
cache.waitUntilReconnected(getTimeout().getValueInMS(), MILLISECONDS))
+ .isInstanceOf(CacheClosedException.class)
+ .hasMessageContaining("Cache could not be recreated")
+
.hasCauseExactlyInstanceOf(DistributedSystemDisconnectedException.class);
assertTrue(cache.getInternalDistributedSystem().isReconnectCancelled());
assertNull(cache.getReconnectedCache());
}
@@ -1290,7 +1295,7 @@ public class ReconnectDUnitTest extends
JUnit4CacheTestCase {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
- return msys.isReconnecting();
+ return msys.isReconnecting() || msys.getReconnectedSystem() !=
null;
}
@Override
@@ -1323,10 +1328,12 @@ public class ReconnectDUnitTest extends
JUnit4CacheTestCase {
}
/**
- * CacheKillingListener crashes the distributed system when it is invoked
for the first time.
+ * CacheListenerTriggeringForcedDisconnect crashes the distributed system
when it is invoked for
+ * the first time.
* After that it ignores any notifications.
*/
- public static class CacheKillingListener extends CacheListenerAdapter
implements Declarable {
+ public static class CacheListenerTriggeringForcedDisconnect extends
CacheListenerAdapter
+ implements Declarable {
public static int crashCount = 0;
@Override
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
new file mode 100644
index 0000000..964383a
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache30;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.DISABLE_AUTO_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static
org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
+import static
org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static
org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static
org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.Locator;
+import
org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
+import org.apache.geode.distributed.internal.InternalLocator;
+import
org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Disconnect;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+public class ReconnectWithClusterConfigurationDUnitTest implements
Serializable {
+ static final int NUM_LOCATORS = 2;
+ static final int NUM_VMS = 4;
+ static DistributedSystem system;
+ static Cache cache;
+ static Locator locator;
+ static int[] locatorPorts = new int[NUM_LOCATORS];
+ static Properties dsProperties;
+
+ @Rule
+ public DistributedRule distributedRule =
DistributedRule.builder().withVMCount(NUM_VMS).build();
+
+ @Before
+ public void setup() {
+ List<AvailablePort.Keeper> randomAvailableTCPPortKeepers =
+ AvailablePortHelper.getRandomAvailableTCPPortKeepers(NUM_LOCATORS);
+ for (int i = 0; i < NUM_LOCATORS; i++) {
+ AvailablePort.Keeper keeper = randomAvailableTCPPortKeepers.get(i);
+ locatorPorts[i] = keeper.getPort();
+ }
+ final int[] locPorts = locatorPorts;
+ Invoke.invokeInEveryVM("set locator ports", () -> locatorPorts = locPorts);
+ for (int i = 0; i < NUM_LOCATORS; i++) {
+ final int locatorNumber = i;
+ randomAvailableTCPPortKeepers.get(locatorNumber).release();
+ VM.getVM(i).invoke("start locator", () -> {
+ try {
+ Disconnect.disconnectFromDS();
+ dsProperties = null;
+ Properties props = getDistributedSystemProperties();
+ locator = Locator.startLocatorAndDS(locatorPorts[locatorNumber], new
File(""), props);
+ system = locator.getDistributedSystem();
+ cache = ((InternalLocator) locator).getCache();
+ ReconnectDUnitTest.savedSystem = locator.getDistributedSystem();
+ IgnoredException.addIgnoredException(
+ "org.apache.geode.ForcedDisconnectException||Possible loss of
quorum");
+ } catch (IOException e) {
+ Assert.fail("unable to start locator", e);
+ }
+ });
+ }
+ }
+
+ @After
+ public void teardown() {
+ for (int i = 0; i < NUM_LOCATORS; i++) {
+ VM.getVM(i).invoke(() -> {
+ InternalLocator locator = InternalLocator.getLocator();
+ if (locator != null) {
+ InternalConfigurationPersistenceService sharedConfig =
+ locator.getConfigurationPersistenceService();
+ if (sharedConfig != null) {
+ sharedConfig.destroySharedConfiguration();
+ }
+ locator.stop();
+ }
+ });
+ }
+ Invoke.invokeInEveryVM(() -> {
+ if (system != null) {
+ system.disconnect();
+ }
+ system = null;
+ cache = null;
+ });
+ }
+
+ public Properties getDistributedSystemProperties() {
+ dsProperties = new Properties();
+ dsProperties.put(MAX_WAIT_TIME_RECONNECT, "" + (5000 * NUM_VMS));
+ dsProperties.put(ENABLE_NETWORK_PARTITION_DETECTION, "true");
+ dsProperties.put(DISABLE_AUTO_RECONNECT, "false");
+ dsProperties.put(ENABLE_CLUSTER_CONFIGURATION, "true");
+ dsProperties.put(USE_CLUSTER_CONFIGURATION, "true");
+ dsProperties.put(HTTP_SERVICE_PORT, "0");
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("localHost[")
+ .append(locatorPorts[0])
+ .append(']');
+ for (int i = 1; i < NUM_LOCATORS; i++) {
+ stringBuilder.append(",localHost[")
+ .append(locatorPorts[0])
+ .append(']');
+ }
+ dsProperties.put(LOCATORS, stringBuilder.toString());
+ dsProperties.put(MCAST_PORT, "0");
+ dsProperties.put(MEMBER_TIMEOUT, "5000");
+ dsProperties.put(LOG_LEVEL, "info");
+ int vmNumber = VM.getCurrentVMNum();
+ if (vmNumber < NUM_LOCATORS) {
+ dsProperties.put(NAME, "loc" + VM.getCurrentVMNum());
+ } else {
+ dsProperties.put(NAME, "vm" + VM.getCurrentVMNum());
+ }
+ return dsProperties;
+ }
+
+
+ @Test
+ public void testReconnectAfterMeltdown() throws InterruptedException {
+
+ for (int i = NUM_LOCATORS; i < NUM_VMS; i++) {
+ VM.getVM(i).invoke("create cache", () -> {
+ cache = new CacheFactory(getDistributedSystemProperties()).create();
+ system = cache.getDistributedSystem();
+ });
+ }
+ AsyncInvocation[] crashers = new AsyncInvocation[NUM_VMS];
+ for (int i = 0; i < NUM_VMS; i++) {
+ crashers[i] = VM.getVM(i).invokeAsync("crash",
+ () -> MembershipManagerHelper.crashDistributedSystem(system));
+ }
+ for (AsyncInvocation crasher : crashers) {
+ crasher.join();
+ }
+ AsyncInvocation[] waiters = new AsyncInvocation[NUM_VMS];
+ for (int i = NUM_VMS - 1; i >= 0; i--) {
+ waiters[i] = VM.getVM(i).invokeAsync("wait for reconnect", () -> {
+
system.waitUntilReconnected(GeodeAwaitility.getTimeout().getValueInMS(),
+ TimeUnit.MILLISECONDS);
+ system = system.getReconnectedSystem();
+ cache = cache.getReconnectedCache();
+ await().untilAsserted(() ->
assertThat(system.getAllOtherMembers().size())
+ .withFailMessage("wrong number of members: " +
system.getAllOtherMembers())
+ .isEqualTo(NUM_VMS - 1));
+ });
+ }
+ for (AsyncInvocation waiter : waiters) {
+ waiter.join();
+ }
+ }
+
+}
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
index 0c259f3..69184ff 100755
---
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
@@ -47,7 +47,6 @@ import
org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.SerialAckedMessage;
-import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import
org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
@@ -450,20 +449,6 @@ public class MembershipJUnitTest {
}
}
- /**
- * test the GMSUtil.formatBytes() method
- */
- @Test
- public void testFormatBytes() throws Exception {
- byte[] bytes = new byte[200];
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] = (byte) (i % 255);
- }
- String str = GMSUtil.formatBytes(bytes, 0, bytes.length);
- System.out.println(str);
- assertEquals(600 + 4, str.length());
- }
-
@Test
public void testMessagesThrowExceptionIfProcessed() throws Exception {
ClusterDistributionManager dm = null;
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index f59f677..b0481e7 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -165,6 +165,7 @@ public class GMSJoinLeaveJUnitTest {
gmsJoinLeave.init(services);
gmsJoinLeave.start();
gmsJoinLeave.started();
+ gmsJoinLeave.setLocalAddress(gmsJoinLeaveMemberId);
}
@After
@@ -333,13 +334,6 @@ public class GMSJoinLeaveJUnitTest {
gmsJoinLeave.processMessage(jrm);
// this should log..
Assert.assertEquals(jrm, joinResponse[0]);
-
- gmsJoinLeave.setJoinResponseMessage(null);
-
- jrm = new JoinResponseMessage(mockMembers[0], new NetView(), 0);
- gmsJoinLeave.processMessage(jrm);
- // this should log..
- Assert.assertEquals(jrm, joinResponse[0]);
}
/**
@@ -622,7 +616,7 @@ public class GMSJoinLeaveJUnitTest {
previousMemberId.setVmViewId(0);
NetView view = new NetView(mockMembers[0], 1,
createMemberList(mockMembers[0], previousMemberId, mockMembers[1]));
- InstallViewMessage viewMessage = new InstallViewMessage(view, 0, true);
+ InstallViewMessage viewMessage = new InstallViewMessage(view, 0, false);
viewMessage.setSender(mockMembers[0]);
gmsJoinLeave.processMessage(viewMessage);
assertEquals(0, gmsJoinLeaveMemberId.getVmViewId());
@@ -635,29 +629,6 @@ public class GMSJoinLeaveJUnitTest {
}
@Test
- public void testViewWithOldIDNotAcceptedAsJoinResponse() throws Exception {
- initMocks();
- when(messenger.isOldMembershipIdentifier(any(DistributedMember.class)))
- .thenReturn(Boolean.TRUE);
- List<InternalDistributedMember> mbrs = new LinkedList<>();
- Set<InternalDistributedMember> shutdowns = new HashSet<>();
- Set<InternalDistributedMember> crashes = new HashSet<>();
- mbrs.add(mockMembers[0]);
- mbrs.add(mockMembers[1]);
- mbrs.add(mockMembers[2]);
- InternalDistributedMember oldId = new InternalDistributedMember(
- gmsJoinLeaveMemberId.getInetAddress(), gmsJoinLeaveMemberId.getPort());
- oldId.setVmViewId(0);
- mbrs.add(oldId);
-
- // prepare the view
- NetView netView = new NetView(mockMembers[0], 1, mbrs, shutdowns, crashes);
- gmsJoinLeave.processMessage(new InstallViewMessage(netView, null, true));
- assertEquals(-1, gmsJoinLeaveMemberId.getVmViewId());
- verify(messenger).isOldMembershipIdentifier(isA(DistributedMember.class));
- }
-
- @Test
public void testRemoveCausesForcedDisconnect() throws Exception {
String reason = "testing";
initMocks();
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 274b558..d5239e8 100755
---
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.SerializationException;
import org.jgroups.Address;
@@ -877,7 +878,8 @@ public class JGroupsMessengerJUnitTest {
initMocks(false);
JChannel channel = messenger.myChannel;
services.getConfig().getTransport().setOldDSMembershipInfo(new
MembershipInformation(channel,
- Collections.singleton(new InternalDistributedMember("localhost",
10000))));
+ Collections.singleton(new InternalDistributedMember("localhost",
10000)),
+ new ConcurrentLinkedQueue<>()));
JGroupsMessenger newMessenger = new JGroupsMessenger();
newMessenger.init(services);
newMessenger.start();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/Cache.java
b/geode-core/src/main/java/org/apache/geode/cache/Cache.java
index 91a2fc2..fc7f4f3 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/Cache.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/Cache.java
@@ -413,6 +413,8 @@ public interface Cache extends GemFireCache {
/**
* Wait for the Cache to finish reconnecting to the distributed system and
recreate a new Cache.
+ * This may throw a CacheClosedException if reconnect attempts fail due to
an exception. The
+ * exception will detail what went wrong.
*
* @see #getReconnectedCache
* @param time amount of time to wait, or -1 to wait forever
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
b/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
index af73889..2b3c5ec 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
@@ -648,6 +648,8 @@ public abstract class DistributedSystem implements
StatisticsFactory {
/**
* Wait for the DistributedSystem to finish reconnecting to the system and
recreate the cache.
+ * This may throw a DistributedSystemDisconnectedException if reconnect
fails. The exception
+ * will detail what went wrong.
*
* @param time amount of time to wait, or -1 to wait forever
* @return true if the system was reconnected
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 1f6f45a..78d4619 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -58,7 +58,6 @@ import org.apache.geode.SystemConnectException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.CacheXmlException;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
@@ -90,6 +89,7 @@ import org.apache.geode.internal.cache.execute.FunctionStats;
import org.apache.geode.internal.cache.execute.InternalFunctionService;
import org.apache.geode.internal.cache.tier.sockets.EncryptorImpl;
import org.apache.geode.internal.cache.xmlcache.CacheServerCreation;
+import
org.apache.geode.internal.config.ClusterConfigurationNotAvailableException;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogConfig;
import org.apache.geode.internal.logging.LogConfigListener;
@@ -1262,7 +1262,7 @@ public class InternalDistributedSystem extends
DistributedSystem
boolean isForcedDisconnect = dm.getRootCause() instanceof
ForcedDisconnectException;
boolean rejoined = false;
this.reconnected = false;
- if (isForcedDisconnect) {
+ if (isForcedDisconnect && !this.isReconnectingDS) {
this.forcedDisconnect = true;
resetReconnectAttemptCounter();
rejoined = tryReconnect(true, reason, GemFireCacheImpl.getInstance());
@@ -2314,6 +2314,11 @@ public class InternalDistributedSystem extends
DistributedSystem
private volatile boolean reconnected = false;
/**
+ * If reconnect fails due to an exception it will be in this field
+ */
+ private Exception reconnectException;
+
+ /**
* Boolean indicating that this member has been shunned by other members or
a network partition
* has occurred
*/
@@ -2643,20 +2648,19 @@ public class InternalDistributedSystem extends
DistributedSystem
logger.warn("Exception occurred while trying to connect the system
during reconnect",
e);
attemptingToReconnect = false;
+ reconnectException = e;
return;
}
logger.warn("Caught SystemConnectException in reconnect", e);
continue;
} catch (GemFireConfigException e) {
- if (isDebugEnabled) {
- logger.debug("Attempt to reconnect failed with
GemFireConfigException");
- }
logger.warn("Caught GemFireConfigException in reconnect", e);
continue;
- } catch (Exception ee) {
+ } catch (Exception e) {
logger.warn("Exception occurred while trying to connect the system
during reconnect",
- ee);
+ e);
attemptingToReconnect = false;
+ reconnectException = e;
return;
} finally {
if (this.locatorDMTypeForced) {
@@ -2671,41 +2675,47 @@ public class InternalDistributedSystem extends
DistributedSystem
// Admin systems don't carry a cache, but for others we can now
create
// a cache
if (newDM.getDMType() !=
ClusterDistributionManager.ADMIN_ONLY_DM_TYPE) {
- try {
- CacheConfig config = new CacheConfig();
- if (cacheXML != null) {
- config.setCacheXMLDescription(cacheXML);
- }
- cache = GemFireCacheImpl.create(this.reconnectDS, config);
+ boolean retry;
+ do {
+ retry = false;
+ try {
+ CacheConfig config = new CacheConfig();
+ if (cacheXML != null) {
+ config.setCacheXMLDescription(cacheXML);
+ }
+ cache = GemFireCacheImpl.create(this.reconnectDS, config);
- if (!cache.isClosed()) {
- createAndStartCacheServers(cacheServerCreation, cache);
- if (cache.getCachePerfStats().getReliableRegionsMissing() ==
0) {
- reconnectAttemptCounter = 0;
+ if (!cache.isClosed()) {
+ createAndStartCacheServers(cacheServerCreation, cache);
+ if (cache.getCachePerfStats().getReliableRegionsMissing() ==
0) {
+ reconnectAttemptCounter = 0;
+ }
}
- }
- } catch (CacheXmlException e) {
- logger.warn("Exception occurred while trying to create the cache
during reconnect",
- e);
- reconnectDS.disconnect();
- reconnectDS = null;
- reconnectCancelled = true;
- break;
- } catch (CancelException ignor) {
- // If this reconnect is for required-roles the algorithm is
recursive and we
- // shouldn't retry at this level
- if (!forcedDisconnect) {
+ } catch (GemFireConfigException e) {
+ if (e.getCause() instanceof
ClusterConfigurationNotAvailableException) {
+ retry = true;
+ logger.info("Reconnected to the cluster but the cluster
configuration service "
+ + "isn't available - will retry creating the cache");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ reconnectCancelled = true;
+ reconnectException = e;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ // We need to give up because we'll probably get the same
exception in
+ // the next attempt to build the cache.
+ logger.warn(
+ "Exception occurred while trying to create the cache
during reconnect. Auto-reconnect is terminating.",
+ e);
+ reconnectCancelled = true;
+ reconnectException = e;
break;
}
- logger.warn("Exception occurred while trying to create the cache
during reconnect",
- ignor);
- reconnectDS.disconnect();
- reconnectDS = null;
- } catch (Exception e) {
- logger.warn("Exception occurred while trying to create the cache
during reconnect",
- e);
- }
+ } while (retry);
}
}
@@ -2716,6 +2726,8 @@ public class InternalDistributedSystem extends
DistributedSystem
} catch (InterruptedException e) {
logger.info("Reconnect thread has been interrupted - exiting");
Thread.currentThread().interrupt();
+ reconnectCancelled = true;
+ reconnectException = e;
return;
}
}
@@ -2745,6 +2757,11 @@ public class InternalDistributedSystem extends
DistributedSystem
} else {
System.setProperty(InternalLocator.INHIBIT_DM_BANNER, inhibitBanner);
}
+ dm.getMembershipManager().setReconnectCompleted(true);
+ InternalDistributedSystem newds = reconnectDS;
+ if (newds != null) {
+ newds.getDM().getMembershipManager().setReconnectCompleted(true);
+ }
if (quorumChecker != null) {
mbrMgr.releaseQuorumChecker(quorumChecker, reconnectDS);
}
@@ -2929,6 +2946,10 @@ public class InternalDistributedSystem extends
DistributedSystem
}
}
+ if (reconnectException != null) {
+ throw new DistributedSystemDisconnectedException(
+ "Reconnect attempts terminated due to exception",
reconnectException);
+ }
InternalDistributedSystem recon = this.reconnectDS;
return !attemptingToReconnect && recon != null && recon.isConnected();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 89d5599..736da0a 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -950,7 +950,11 @@ public class InternalLocator extends Locator implements
ConnectListener, LogConf
Thread.sleep(5000);
}
logger.info("waiting for distributed system to reconnect...");
- restarted = ds.waitUntilReconnected(-1, TimeUnit.SECONDS);
+ try {
+ restarted = ds.waitUntilReconnected(-1, TimeUnit.SECONDS);
+ } catch (CancelException e) {
+ // reconnect attempt failed
+ }
if (restarted) {
logger.info("system restarted");
} else {
@@ -1047,7 +1051,12 @@ public class InternalLocator extends Locator implements
ConnectListener, LogConf
}
this.stoppedForReconnect = false;
}
- restartWithDS(newSystem, GemFireCacheImpl.getInstance());
+ try {
+ restartWithDS(newSystem, GemFireCacheImpl.getInstance());
+ } catch (CancelException e) {
+ this.stoppedForReconnect = true;
+ return false;
+ }
setLocator(this);
restarted = true;
}
@@ -1089,7 +1098,14 @@ public class InternalLocator extends Locator implements
ConnectListener, LogConf
this.myDs.setDependentLocator(this);
logger.info("Locator restart: initializing TcpServer");
- this.server.restarting(newSystem, newCache,
this.configurationPersistenceService);
+ try {
+ this.server.restarting(newSystem, newCache,
this.configurationPersistenceService);
+ } catch (CancelException e) {
+ this.myDs = null;
+ this.myCache = null;
+ logger.info("Locator restart: attempt to restart location services
failed", e);
+ throw e;
+ }
if (this.productUseLog.isClosed()) {
this.productUseLog.reopen();
}
@@ -1108,6 +1124,7 @@ public class InternalLocator extends Locator implements
ConnectListener, LogConf
endStartLocator(this.myDs);
logger.info("Locator restart completed");
}
+ this.server.restartCompleted(newSystem);
}
public ClusterManagementService getClusterManagementService() {
@@ -1261,6 +1278,15 @@ public class InternalLocator extends Locator implements
ConnectListener, LogConf
}
@Override
+ public void restartCompleted(DistributedSystem ds) {
+ if (ds != null) {
+ for (TcpHandler handler : this.allHandlers) {
+ handler.restartCompleted(ds);
+ }
+ }
+ }
+
+ @Override
public Object processRequest(Object request) throws IOException {
long giveup = 0;
while (giveup == 0 || System.currentTimeMillis() < giveup) {
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
index f5dcd52..fbea015 100755
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
@@ -288,10 +288,12 @@ public class ServerLocator implements TcpHandler,
DistributionAdvisee {
this.loadSnapshot = new LocatorLoadSnapshot();
this.ds = (InternalDistributedSystem) ds;
this.advisor = ControllerAdvisor.createControllerAdvisor(this); //
escapes constructor but
- //
allows field to be final
- if (ds.isConnected()) {
- this.advisor.handshake(); // GEODE-1393: need to get server
information during restart
- }
+ }
+ }
+
+ public void restartCompleted(DistributedSystem ds) {
+ if (ds.isConnected()) {
+ this.advisor.handshake(); // GEODE-1393: need to get server information
during restart
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
index 4ac997f..0a3b0b0 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
@@ -150,6 +150,12 @@ public interface MembershipManager {
/**
+ * informs the membership manager that a reconnect has been completed
+ */
+ public void setReconnectCompleted(boolean reconnectCompleted);
+
+
+ /**
* Determine whether GCS shutdown has commenced
*
* @return true if it is shutting down
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSUtil.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSUtil.java
index d7366f5..0e14918 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSUtil.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSUtil.java
@@ -128,25 +128,4 @@ public class GMSUtil {
return sb.toString();
}
-
- /**
- * Formats the bytes in a buffer into hex octets, 50 per line
- */
- public static String formatBytes(byte[] buf, int startIndex, int length) {
- StringBuilder w = new StringBuilder(20000);
- int count = 0;
- for (int i = startIndex; i < length; i++, count++) {
- String s = Integer.toHexString(buf[i] & 0xff);
- if (s.length() == 1) {
- w.append('0');
- }
- w.append(s).append(' ');
- if ((count % 50) == 49) {
- w.append("\n");
- }
- }
- return w.toString();
- }
-
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
index 051e4aa..e8bc0b9 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
@@ -180,6 +180,14 @@ public class Services {
}
}
+ public void setLocalAddress(InternalDistributedMember address) {
+ this.auth.setLocalAddress(address);
+ this.messenger.setLocalAddress(address);
+ this.joinLeave.setLocalAddress(address);
+ this.healthMon.setLocalAddress(address);
+ this.manager.setLocalAddress(address);
+ }
+
public void emergencyClose() {
if (this.stopping) {
return;
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 22e3b73..ab72a07 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -317,7 +317,8 @@ public class GMSHealthMonitor implements HealthMonitor,
MessageHandler {
if (playingDead) {
logger.debug("HealthMonitor: simulating sick member in health
check");
} else if (uuidLSBs == myUUID.getLeastSignificantBits()
- && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId ==
myVmViewId) {
+ && uuidMSBs == myUUID.getMostSignificantBits()
+ && (vmViewId == myVmViewId || myVmViewId < 0)) {
logger.debug("HealthMonitor: sending OK reply");
out.write(OK);
out.flush();
@@ -1009,7 +1010,8 @@ public class GMSHealthMonitor implements HealthMonitor,
MessageHandler {
stopServices();
}
- void setLocalAddress(InternalDistributedMember idm) {
+ @Override
+ public void setLocalAddress(InternalDistributedMember idm) {
this.localAddress = idm;
}
@@ -1062,7 +1064,7 @@ public class GMSHealthMonitor implements HealthMonitor,
MessageHandler {
// only respond if the intended recipient is this member
InternalDistributedMember me = localAddress;
- if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
+ if (me == null || me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
hm.setRecipient(m.getSender());
Set<InternalDistributedMember> membersNotReceivedMsg =
services.getMessenger().send(hm);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Manager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Manager.java
index 6303dfc..4a0ef6b 100755
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Manager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Manager.java
@@ -110,4 +110,10 @@ public interface Manager extends Service, MessageHandler {
*/
boolean isReconnectingDS();
+ /**
+ * If this.isReconnectingDS() then this method will inform whether the
reconnect
+ * has completed
+ */
+ boolean isReconnectCompleted();
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
index 0b7d2c2..403518d 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
@@ -78,4 +78,5 @@ public interface Service {
String reason);
+ default void setLocalAddress(InternalDistributedMember address) {}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
index 659797a..407cff1 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -178,6 +178,10 @@ public class GMSLocator implements Locator, NetLocator {
@Override
public void setIsCoordinator(boolean isCoordinator) {
+ if (isCoordinator) {
+ logger.info("Location services has received notification that this node
is becoming"
+ + " membership coordinator");
+ }
this.isCoordinator = isCoordinator;
}
@@ -250,6 +254,9 @@ public class GMSLocator implements Locator, NetLocator {
synchronized (registrants) {
registrants.add(findRequest.getMemberID());
+ if (recoveredView != null) {
+ recoveredView.remove(findRequest.getMemberID());
+ }
}
if (v != null) {
@@ -299,9 +306,7 @@ public class GMSLocator implements Locator, NetLocator {
synchronized (registrants) {
if (isCoordinator) {
coordinator = localAddress;
-
if (v != null && localAddress != null &&
!localAddress.equals(v.getCoordinator())) {
- logger.info("This member is becoming coordinator since view {}", v);
v = null;
fromView = false;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index df845df..261bb70 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -264,6 +264,7 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
int locatorsContacted = 0;
boolean hasContactedAJoinedLocator;
NetView view;
+ int lastFindCoordinatorInViewId = -1000;
final Set<FindCoordinatorResponse> responses = new HashSet<>();
public int responsesExpected;
@@ -459,8 +460,7 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
throw new GemFireSecurityException(failReason);
}
- // there is no way we can rech here right now
- throw new RuntimeException("Join Request Failed with response " +
joinResponse[0]);
+ throw new RuntimeException("Join Request Failed with response " +
response);
}
private JoinResponseMessage waitForJoinResponse() throws
InterruptedException {
@@ -476,29 +476,31 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
}
response = joinResponse[0];
- if (response != null && response.getCurrentView() != null && !isJoined) {
- // reset joinResponse[0]
- joinResponse[0] = null;
- // we got view here that means either we have to wait for
- NetView v = response.getCurrentView();
- InternalDistributedMember coord = v.getCoordinator();
- if (searchState.alreadyTried.contains(coord)) {
- searchState.view = response.getCurrentView();
- // we already sent join request to it..so lets wait some more time
here
- // assuming we got this response immediately, so wait for same
timeout here..
- long timeout = Math.max(services.getConfig().getMemberTimeout(),
- services.getConfig().getJoinTimeout() / 5);
- joinResponse.wait(timeout);
- response = joinResponse[0];
- } else {
- // try on this coordinator
- searchState.view = response.getCurrentView();
- response = null;
+ if
(services.getConfig().getDistributionConfig().getSecurityUDPDHAlgo().length() >
0) {
+ if (response != null && response.getCurrentView() != null &&
!isJoined) {
+ // reset joinResponse[0]
+ joinResponse[0] = null;
+ // we got view here that means either we have to wait for
+ NetView v = response.getCurrentView();
+ InternalDistributedMember coord = v.getCoordinator();
+ if (searchState.alreadyTried.contains(coord)) {
+ searchState.view = response.getCurrentView();
+ // we already sent join request to it..so lets wait some more time
here
+ // assuming we got this response immediately, so wait for same
timeout here..
+ long timeout = Math.max(services.getConfig().getMemberTimeout(),
+ services.getConfig().getJoinTimeout() / 5);
+ joinResponse.wait(timeout);
+ response = joinResponse[0];
+ } else {
+ // try on this coordinator
+ searchState.view = response.getCurrentView();
+ response = null;
+ }
+ searchState.view = v;
+ }
+ if (isJoined) {
+ return null;
}
- searchState.view = v;
- }
- if (isJoined) {
- return null;
}
}
return response;
@@ -616,7 +618,7 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
services.getHealthMonitor().getMembersFailingAvailabilityCheck();
check.removeAll(suspectMembers);
logger.info(
- "View with removed and left members removed is {}\nremoved members:
{}\nleft members: {}\nsuspect members: {}",
+ "View with removed and left members removed is {}; removed members:
{}; left members: {}; suspect members: {}",
check, removedMembers, leftMembers, suspectMembers);
if (check.getCoordinator().equals(localAddress)) {
synchronized (viewInstallationLock) {
@@ -995,6 +997,7 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
}
private void processViewMessage(final InstallViewMessage m) {
+ logger.debug("processing membership view message {}", m);
NetView view = m.getView();
@@ -1016,12 +1019,11 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
}
boolean viewContainsMyNewAddress = false;
- if (!this.isJoined) {
+ if (!this.isJoined && !m.isPreparing()) {
// if we're still waiting for a join response and we're in this view we
// should install the view so join() can finish its work
for (InternalDistributedMember mbr : view.getMembers()) {
- if (localAddress.equals(mbr)
- && !services.getMessenger().isOldMembershipIdentifier(mbr)) {
+ if (localAddress.equals(mbr)) {
viewContainsMyNewAddress = true;
break;
}
@@ -1030,12 +1032,24 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
if (m.isPreparing()) {
if (this.preparedView != null && this.preparedView.getViewId() >=
view.getViewId()) {
- services.getMessenger()
- .send(new ViewAckMessage(view.getViewId(), m.getSender(),
this.preparedView));
+ if (this.preparedView.getViewId() == view.getViewId() &&
+ this.preparedView.getCreator().equals(view.getCreator())) {
+ // this can happen if we received two prepares during auto-reconnect
+ } else {
+ services.getMessenger()
+ .send(new ViewAckMessage(view.getViewId(), m.getSender(),
this.preparedView));
+ }
} else {
this.preparedView = view;
- if (viewContainsMyNewAddress) {
- installView(view); // this will notifyAll the joinResponse
+ // complete filling in the member ID of this node, if possible
+ for (InternalDistributedMember mbr : view.getMembers()) {
+ if (this.localAddress.equals(mbr)) {
+ this.birthViewId = mbr.getVmViewId();
+ this.localAddress.setVmViewId(this.birthViewId);
+ GMSMember me = (GMSMember) this.localAddress.getNetMember();
+ me.setBirthViewId(birthViewId);
+ break;
+ }
}
ackView(m);
}
@@ -1096,12 +1110,10 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
assert this.localAddress != null;
- // If we've already tried to bootstrap from locators that
- // haven't joined the system (e.g., a collocated locator)
- // then jump to using the membership view to try to find
- // the coordinator
if (!state.hasContactedAJoinedLocator && state.registrants.size() >=
locators.size()
- && state.view != null) {
+ && state.view != null && state.viewId >
state.lastFindCoordinatorInViewId) {
+ state.lastFindCoordinatorInViewId = state.viewId;
+ logger.info("using findCoordinatorFromView");
return findCoordinatorFromView();
}
@@ -1163,7 +1175,8 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
// the QuorumChecker would have contacted a quorum of live nodes
and one of
// them should already be the coordinator, or should become the
coordinator soon
boolean isMyOldAddress =
- services.getConfig().isReconnecting() &&
localAddress.equals(responseCoordinator);
+ services.getConfig().isReconnecting() &&
localAddress.equals(responseCoordinator)
+ && responseCoordinator.getVmViewId() >= 0;
if (!isMyOldAddress) {
possibleCoordinators.add(response.getCoordinator());
}
@@ -1224,6 +1237,8 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
}
}
}
+ logger.info("findCoordinator chose {} out of these possible coordinators:
{}",
+ state.possibleCoordinator, possibleCoordinators);
return true;
}
@@ -1348,9 +1363,12 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
// 2. Member which was coordinator but just now some other member
became coordinator
// 3. we got message with secret key, but still view is coming and
that will inform the
// joining thread
- if (rsp.getRejectionMessage() != null || rsp.getCurrentView() != null)
{
+ if (rsp.getRejectionMessage() != null) {
joinResponse[0] = rsp;
joinResponse.notifyAll();
+ } else if (rsp.getCurrentView() != null) {
+ // ignore - we get to join when we receive a view. Joining earlier
may
+ // confuse other members if we've reused an old address
} else {
// we got secret key lets add it
services.getMessenger().setClusterSecretKey(rsp.getSecretPk());
@@ -1646,8 +1664,10 @@ public class GMSJoinLeave implements JoinLeave,
MessageHandler {
public void start() {}
@Override
- public void started() {
- this.localAddress = services.getMessenger().getMemberID();
+ public void started() {}
+
+ public void setLocalAddress(InternalDistributedMember address) {
+ this.localAddress = address;
GMSMember mbr = (GMSMember) this.localAddress.getNetMember();
if (services.getConfig().areLocatorsPreferredAsCoordinators()) {
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
index 50b803d..14adc8d 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.Logger;
import org.jgroups.Address;
@@ -55,6 +56,7 @@ public class GMSQuorumChecker implements QuorumChecker {
private JGAddress myAddress;
private final long partitionThreshold;
private Set<DistributedMember> oldDistributedMemberIdentifiers;
+ private ConcurrentLinkedQueue<Message> messageQueue = new
ConcurrentLinkedQueue<>();
public GMSQuorumChecker(NetView jgView, int partitionThreshold, JChannel
channel,
Set<DistributedMember> oldDistributedMemberIdentifiers) {
@@ -125,7 +127,7 @@ public class GMSQuorumChecker implements QuorumChecker {
@Override
public MembershipInformation getMembershipInfo() {
- return new MembershipInformation(channel, oldDistributedMemberIdentifiers);
+ return new MembershipInformation(channel, oldDistributedMemberIdentifiers,
messageQueue);
}
private boolean calculateQuorum() {
@@ -219,9 +221,15 @@ public class GMSQuorumChecker implements QuorumChecker {
}
} else if (pingPonger.isPongMessage(msgBytes)) {
pongReceived(msg.getSrc());
+ } else {
+ queueMessage(msg);
}
}
+ private void queueMessage(Message msg) {
+ messageQueue.add(msg);
+ }
+
@Override
public void getState(OutputStream output) throws Exception {}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 78ceba2..cf526f4 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +86,7 @@ import
org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.QuorumChecker;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
import org.apache.geode.distributed.internal.membership.gms.Services;
+import
org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import
org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
import
org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import
org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -171,6 +173,19 @@ public class JGroupsMessenger implements Messenger {
*/
private Set<DistributedMember> usedDistributedMemberIdentifiers = new
HashSet<>();
+ /**
+ * During reconnect a QuorumChecker holds the JGroups channel and responds
to Ping
+ * and Pong messages but also queues any messages it doesn't recognize.
These need
+ * to be delivered to handlers after membership services have been rebuilt.
+ */
+ private Queue<Message> queuedMessagesFromReconnect;
+
+ /**
+ * The JGroupsReceiver is handed messages by the JGroups Channel. It is
responsible
+ * for deserializating and dispatching those messages to the appropriate
handler
+ */
+ private JGroupsReceiver jgroupsReceiver;
+
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
@@ -307,6 +322,7 @@ public class JGroupsMessenger implements Messenger {
MembershipInformation oldInfo = (MembershipInformation)
oldDSMembershipInfo;
myChannel = oldInfo.getChannel();
usedDistributedMemberIdentifiers = oldInfo.getMembershipIdentifiers();
+ queuedMessagesFromReconnect = oldInfo.getQueuedMessages();
// scrub the old channel
ViewId vid = new ViewId(new JGAddress(), 0);
@@ -343,7 +359,8 @@ public class JGroupsMessenger implements Messenger {
try {
myChannel.setReceiver(null);
- myChannel.setReceiver(new JGroupsReceiver());
+ jgroupsReceiver = new JGroupsReceiver();
+ myChannel.setReceiver(jgroupsReceiver);
if (!reconnecting) {
myChannel.connect("AG"); // apache g***** (whatever we end up calling
it)
}
@@ -385,7 +402,17 @@ public class JGroupsMessenger implements Messenger {
}
@Override
- public void started() {}
+ public void started() {
+ if (queuedMessagesFromReconnect != null) {
+ logger.info("Delivering {} messages queued by quorum checker",
+ queuedMessagesFromReconnect.size());
+ for (Message message : queuedMessagesFromReconnect) {
+ jgroupsReceiver.receive(message, true);
+ }
+ queuedMessagesFromReconnect.clear();
+ queuedMessagesFromReconnect = null;
+ }
+ }
@Override
public void stop() {
@@ -527,6 +554,8 @@ public class JGroupsMessenger implements Messenger {
gmsMember.setMemberWeight((byte) (services.getConfig().getMemberWeight() &
0xff));
gmsMember.setNetworkPartitionDetectionEnabled(
services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection());
+
+ services.setLocalAddress(localAddress);
}
@Override
@@ -1224,6 +1253,10 @@ public class JGroupsMessenger implements Messenger {
@Override
public void receive(Message jgmsg) {
+ receive(jgmsg, false);
+ }
+
+ private void receive(Message jgmsg, boolean fromQuorumChecker) {
long startTime = DistributionStats.getStatTime();
try {
if (services.getManager().shutdownInProgress()) {
@@ -1277,7 +1310,13 @@ public class JGroupsMessenger implements Messenger {
logger.trace("JGroupsMessenger dispatching {} from {}", msg,
msg.getSender());
}
filterIncomingMessage(msg);
- getMessageHandler(msg).processMessage(msg);
+ MessageHandler handler = getMessageHandler(msg);
+ if (fromQuorumChecker && handler instanceof HealthMonitor) {
+ // ignore suspect / heartbeat messages that happened during
+ // auto-reconnect because they very likely have old member IDs in
them
+ } else {
+ handler.processMessage(msg);
+ }
// record the scheduling of broadcast messages
NakAckHeader2 header = (NakAckHeader2)
jgmsg.getHeader(nackack2HeaderId);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
index adcfc43..80bc6e7 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/MembershipInformation.java
@@ -14,9 +14,11 @@
*/
package org.apache.geode.distributed.internal.membership.gms.messenger;
+import java.util.Queue;
import java.util.Set;
import org.jgroups.JChannel;
+import org.jgroups.Message;
import org.apache.geode.distributed.DistributedMember;
@@ -27,12 +29,15 @@ import org.apache.geode.distributed.DistributedMember;
public class MembershipInformation {
private final JChannel channel;
private final Set<DistributedMember> membershipIdentifiers;
+ private final Queue<Message> queuedMessages;
protected MembershipInformation(JChannel channel,
- Set<DistributedMember> oldMembershipIdentifiers) {
+ Set<DistributedMember> oldMembershipIdentifiers,
+ Queue<Message> queuedMessages) {
this.channel = channel;
this.membershipIdentifiers = oldMembershipIdentifiers;
+ this.queuedMessages = queuedMessages;
}
public JChannel getChannel() {
@@ -42,4 +47,8 @@ public class MembershipInformation {
public Set<DistributedMember> getMembershipIdentifiers() {
return membershipIdentifiers;
}
+
+ public Queue<Message> getQueuedMessages() {
+ return this.queuedMessages;
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index d8f3353..c998374 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -107,6 +107,12 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
private boolean wasReconnectingSystem;
/**
+ * This indicates that the DistributedSystem using this membership manager
performed
+ * a successful auto-reconnect. This may include successful recreation of a
Cache
+ */
+ private boolean reconnectCompleted;
+
+ /**
* A quorum checker is created during reconnect and is held here so it is
available to the UDP
* protocol for passing off the ping-pong responses used in the
quorum-checking algorithm.
*/
@@ -1785,7 +1791,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
*/
@Override
public boolean isReconnectingDS() {
- return !this.hasJoined && this.wasReconnectingSystem;
+ return this.wasReconnectingSystem && !this.reconnectCompleted;
}
@Override
@@ -2180,6 +2186,17 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
this.tcpDisabled = false;
}
+ @Override
+ public void setReconnectCompleted(boolean reconnectCompleted) {
+ this.reconnectCompleted = reconnectCompleted;
+ }
+
+ @Override
+ public boolean isReconnectCompleted() {
+ return reconnectCompleted;
+ }
+
+
/*
* non-thread-owned serial channels and high priority channels are not
included
*/
@@ -2543,11 +2560,17 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
shutdownCause);
}
+ if (this.isReconnectingDS()) {
+ logger.info("Reconnecting system failed to connect");
+ uncleanShutdown(reason,
+ new ForcedDisconnectException("reconnecting system failed to
connect"));
+ return;
+ }
+
if
(!services.getConfig().getDistributionConfig().getDisableAutoReconnect()) {
saveCacheXmlForReconnect();
}
-
Thread reconnectThread = new LoggingThread("DisconnectThread", false, ()
-> {
// stop server locators immediately since they may not have correct
// information. This has caused client failures in bridge/wan
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
index 424b3e4..1d19bf5 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpHandler.java
@@ -51,6 +51,11 @@ public interface TcpHandler {
InternalConfigurationPersistenceService sharedConfig);
/**
+ * Informs the handler that restart has completed
+ */
+ default void restartCompleted(DistributedSystem ds) {}
+
+ /**
* Initialize the handler with the TcpServer. Called before the TcpServer
starts accepting
* connections.
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 7b41d00..e21697a 100755
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -199,6 +199,10 @@ public class TcpServer {
+ System.identityHashCode(this.serverThread) + ";alive=" +
this.serverThread.isAlive());
}
+ public void restartCompleted(InternalDistributedSystem ds) {
+ this.handler.restartCompleted(ds);
+ }
+
public void start() throws IOException {
this.shuttingDown = false;
startServerThread();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
index 30c2123..bed73da 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.geode.CancelException;
import org.apache.geode.LogWriter;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
@@ -728,7 +729,11 @@ public class CacheServerLauncher {
// system.isReconnecting());
boolean reconnected = false;
if (system.isReconnecting()) {
- reconnected = system.waitUntilReconnected(-1, TimeUnit.SECONDS);
+ try {
+ reconnected = system.waitUntilReconnected(-1, TimeUnit.SECONDS);
+ } catch (CancelException e) {
+ // reconnect failed
+ }
if (reconnected) {
system = (InternalDistributedSystem) system.getReconnectedSystem();
cache = system.getCache();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index fa6fc2a..f3d1187 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2462,12 +2462,16 @@ public class GemFireCacheImpl implements InternalCache,
InternalClientCache, Has
@Override
public boolean waitUntilReconnected(long time, TimeUnit units) throws
InterruptedException {
- boolean systemReconnected = this.system.waitUntilReconnected(time, units);
- if (!systemReconnected) {
- return false;
+ try {
+ boolean systemReconnected = this.system.waitUntilReconnected(time,
units);
+ if (!systemReconnected) {
+ return false;
+ }
+ GemFireCacheImpl cache = getInstance();
+ return cache != null && cache.isInitialized();
+ } catch (CancelException e) {
+ throw new CacheClosedException("Cache could not be recreated", e);
}
- GemFireCacheImpl cache = getInstance();
- return cache != null && cache.isInitialized();
}
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index a299fa6..a6715c4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -46,6 +46,7 @@ import javax.net.ssl.SSLException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
+import org.apache.geode.SerializationException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.distributed.DistributedMember;
@@ -2824,7 +2825,7 @@ public class Connection implements Runnable {
Header header = msgReader.readHeader();
- ReplyMessage msg;
+ ReplyMessage msg = null;
int len;
if (header.getMessageType() == NORMAL_MSG_TYPE) {
msg = (ReplyMessage) msgReader.readMessage(header);
@@ -2934,8 +2935,12 @@ public class Connection implements Runnable {
peerDataBuffer.limit(startPos + messageLength);
if (this.handshakeRead) {
- readMessage(peerDataBuffer);
-
+ try {
+ readMessage(peerDataBuffer);
+ } catch (SerializationException e) {
+ logger.info("input buffer startPos {} oldLimit {}", startPos,
oldLimit);
+ throw e;
+ }
} else {
ByteBufferInputStream bbis = new
ByteBufferInputStream(peerDataBuffer);
DataInputStream dis = new DataInputStream(bbis);
@@ -3120,7 +3125,16 @@ public class Connection implements Runnable {
ReplyProcessor21.initMessageRPId();
// add serialization stats
long startSer =
this.owner.getConduit().getStats().startMsgDeserialization();
- msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
+ int startingPosition = peerDataBuffer.position();
+ try {
+ msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
+ } catch (SerializationException e) {
+ logger.info("input buffer starting position {} "
+ + " current position {} limit {} capacity {} message length {}",
+ startingPosition, peerDataBuffer.position(),
peerDataBuffer.limit(),
+ peerDataBuffer.capacity(), messageLength);
+ throw e;
+ }
this.owner.getConduit().getStats().endMsgDeserialization(startSer);
if (bbis.available() != 0) {
logger.warn("Message deserialization of {} did not read {} bytes.",