This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 64b727e GEODE-8195: ConcurrentModificationException from
LocatorMembershipListenerImpl (#5306)
64b727e is described below
commit 64b727e105117d806b6e680ec5f2b0f9bbce0afd
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Jun 26 07:47:23 2020 -0700
GEODE-8195: ConcurrentModificationException from
LocatorMembershipListenerImpl (#5306)
I've replaced the "for" loop using an implicit Iterator with one using an
explicit Iterator so that its safe "remove()" method can be used. The
Iterator method is stated as being the only safe way to modify the
collection while iterating over its contents.
I've also modified a test to validate the fix. The test forces a
failure to send two messages to an address. The failures are then
handled in the code that was throwing the
ConcurrentModificationException and, since there are two failures,
it causes two removals to be performedon the failedMessages collection.
(cherry picked from commit 3cda1b1a213f2195ff0b97361883f6a6c3972b14)
---
.../locator/wan/LocatorMembershipListenerImpl.java | 14 ++--
.../locator/wan/LocatorMembershipListenerTest.java | 88 ++++++++++++++++------
2 files changed, 72 insertions(+), 30 deletions(-)
diff --git
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
index acb4f0a..253f6dc 100644
---
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
+++
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal.locator.wan;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -295,15 +296,15 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
public void run() {
Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new
HashMap<>();
for (Map.Entry<Integer, Set<DistributionLocatorId>> entry :
remoteLocators.entrySet()) {
- for (DistributionLocatorId value : entry.getValue()) {
+ for (DistributionLocatorId remoteLocator : entry.getValue()) {
// Notify known remote locator about the advertised locator.
LocatorJoinMessage advertiseNewLocatorMessage = new
LocatorJoinMessage(
joiningLocatorDistributedSystemId, joiningLocator,
localLocatorId, "");
- sendMessage(value, advertiseNewLocatorMessage, failedMessages);
+ sendMessage(remoteLocator, advertiseNewLocatorMessage,
failedMessages);
// Notify the advertised locator about remote known locator.
LocatorJoinMessage advertiseKnownLocatorMessage =
- new LocatorJoinMessage(entry.getKey(), value, localLocatorId,
"");
+ new LocatorJoinMessage(entry.getKey(), remoteLocator,
localLocatorId, "");
sendMessage(joiningLocator, advertiseKnownLocatorMessage,
failedMessages);
}
}
@@ -317,9 +318,11 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
DistributionLocatorId targetLocator = entry.getKey();
Set<LocatorJoinMessage> joinMessages = entry.getValue();
- for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
+ for (Iterator<LocatorJoinMessage> iterator =
joinMessages.iterator(); iterator
+ .hasNext();) {
+ LocatorJoinMessage locatorJoinMessage = iterator.next();
if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
- joinMessages.remove(locatorJoinMessage);
+ iterator.remove();
} else {
// Sleep between retries.
try {
@@ -328,6 +331,7 @@ public class LocatorMembershipListenerImpl implements
LocatorMembershipListener
Thread.currentThread().interrupt();
logger.warn(
"Locator Membership listener permanently failed to
exchange locator information due to interruption.");
+ return;
}
}
}
diff --git
a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
index d282128..2869d4d 100644
---
a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
+++
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -46,7 +47,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.contrib.java.lang.system.SystemOutRule;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
@@ -55,9 +58,13 @@ import
org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.test.junit.ResultCaptor;
public class LocatorMembershipListenerTest {
+ public static final int TIMEOUT = 500;
private TcpClient tcpClient;
private LocatorMembershipListenerImpl locatorMembershipListener;
+ @Rule
+ public SystemOutRule systemOutRule = new SystemOutRule();
+
private DistributionLocatorId buildDistributionLocatorId(int port) {
return new DistributionLocatorId("localhost[" + port + "]");
}
@@ -86,10 +93,10 @@ public class LocatorMembershipListenerTest {
throws IOException, ClassNotFoundException {
verify(tcpClient).requestToServer(initialTargetLocator.getHost(),
new LocatorJoinMessage(advertisedLocatorDsId, advertisedLocator,
sourceLocator, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
verify(tcpClient).requestToServer(advertisedLocator.getHost(),
new LocatorJoinMessage(initialTargetLocatorDsId, initialTargetLocator,
sourceLocator, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
}
private void joinLocatorsDistributorThread(ResultCaptor<Thread>
resultCaptor) {
@@ -103,7 +110,7 @@ public class LocatorMembershipListenerTest {
DistributionConfig distributionConfig = mock(DistributionConfig.class);
when(distributionConfig.getStartLocator()).thenReturn(DistributionConfig.DEFAULT_START_LOCATOR);
when(distributionConfig.getMemberTimeout())
- .thenReturn(DistributionConfig.DEFAULT_MEMBER_TIMEOUT);
+ .thenReturn(TIMEOUT);
tcpClient = mock(TcpClient.class);
locatorMembershipListener = spy(new
LocatorMembershipListenerImpl(tcpClient));
@@ -275,7 +282,7 @@ public class LocatorMembershipListenerTest {
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
when(tcpClient.requestToServer(locator3Site1.getHost(),
new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ TIMEOUT, false))
.thenThrow(new EOFException("Mock Exception"));
ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
@@ -287,39 +294,70 @@ public class LocatorMembershipListenerTest {
verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS +
1)).requestToServer(
locator3Site1.getHost(),
new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
verify(tcpClient).requestToServer(joiningLocator.getHost(),
new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
}
@Test
public void
locatorJoinedShouldNotRetryAgainAfterSuccessfulRetryOnConnectionFailures()
throws IOException, ClassNotFoundException {
+ systemOutRule.enableLog();
ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new
ConcurrentHashMap<>();
+ DistributionLocatorId localLocatorID = buildDistributionLocatorId(10101);
DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
- DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
- DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
- allLocatorsInfo.put(1, new
HashSet<>(Collections.singletonList(locator3Site1)));
+ DistributionLocatorId remoteLocator1 = buildDistributionLocatorId(10103);
+ DistributionLocatorId remoteLocator2 = buildDistributionLocatorId(10104);
+ final HashSet<DistributionLocatorId> remoteLocators =
+ new HashSet<>(Arrays.asList(new DistributionLocatorId[]
{remoteLocator1, remoteLocator2}));
+ allLocatorsInfo.put(1, remoteLocators);
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
- when(tcpClient.requestToServer(locator3Site1.getHost(),
- new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ // have messaging fail twice so that LocatorMembershipListenerImpl's
retryMessage logic is
+ // exercised
+ when(tcpClient.requestToServer(remoteLocator1.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""),
+ TIMEOUT, false))
+ .thenThrow(new EOFException("Test Exception"))
+ .thenThrow(new EOFException("Test Exception"))
+ .thenReturn(null);
+ when(tcpClient.requestToServer(remoteLocator2.getHost(),
+ new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""),
+ TIMEOUT, false))
+ .thenThrow(new EOFException("Test Exception"))
+ .thenThrow(new EOFException("Test Exception"))
+ .thenReturn(null);
+ when(tcpClient.requestToServer(joiningLocator.getHost(),
+ new LocatorJoinMessage(1, remoteLocator1, localLocatorID, ""),
+ TIMEOUT, false))
+ .thenThrow(new EOFException("Test Exception"))
+ .thenThrow(new EOFException("Test Exception"))
+ .thenReturn(null);
+ // also have the joining locator fail to receive messages so we can test
that code path.
+ // It will fail to receive messages informing it of remoteLocator1 and
remoteLocator2, so it
+ // will have
+ // two failed messages to retry. The others will each have one message to
retry, informing
+ // them about the joiningLocator.
+ when(tcpClient.requestToServer(joiningLocator.getHost(),
+ new LocatorJoinMessage(1, remoteLocator2, localLocatorID, ""),
+ TIMEOUT, false))
+ .thenThrow(new EOFException("Mock Exception"))
.thenThrow(new EOFException("Mock Exception"))
.thenReturn(null);
ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
doAnswer(resultCaptor).when(locatorMembershipListener).buildLocatorsDistributorThread(
any(DistributionLocatorId.class), anyMap(),
any(DistributionLocatorId.class), anyInt());
- locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+ locatorMembershipListener.locatorJoined(1, joiningLocator, localLocatorID);
joinLocatorsDistributorThread(resultCaptor);
- verify(tcpClient, times(2)).requestToServer(locator3Site1.getHost(),
- new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
- verify(tcpClient).requestToServer(joiningLocator.getHost(),
- new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+
assertThat(systemOutRule.getLog()).doesNotContain("ConcurrentModificationException");
+
+ // The sendMessage loop in the listener will try to send 4 messages. Two
to the remoteLocators
+ // and two to the joiningLocator. The retry loop will try to send the
messages again and
+ // fail (4 more messages) and then it will succeed (4 more messages, for a
total of 12).
+ verify(tcpClient, times(12)).requestToServer(isA(HostAndPort.class),
+ isA(LocatorJoinMessage.class), isA(Integer.class), isA(Boolean.class));
}
@Test
@@ -339,14 +377,14 @@ public class LocatorMembershipListenerTest {
// Fail on first 2 attempts and succeed on third attempt.
when(tcpClient.requestToServer(locator3Site1.getHost(),
new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ TIMEOUT, false))
.thenThrow(new EOFException("Mock Exception"))
.thenThrow(new EOFException("Mock Exception")).thenReturn(null);
// Fail always.
when(tcpClient.requestToServer(joiningLocator.getHost(),
new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+ TIMEOUT, false))
.thenThrow(new EOFException("Mock Exception"));
ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
@@ -358,17 +396,17 @@ public class LocatorMembershipListenerTest {
verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2,
locator1Site2);
verify(tcpClient, times(3)).requestToServer(locator3Site1.getHost(),
new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
verify(tcpClient).requestToServer(joiningLocator.getHost(),
new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
verify(tcpClient).requestToServer(locator1Site3.getHost(),
new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS +
1)).requestToServer(
joiningLocator.getHost(),
new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
- DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+ TIMEOUT, false);
}
private static class HandlerCallable implements Callable<Object> {