This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push: new 64b727e GEODE-8195: ConcurrentModificationException from LocatorMembershipListenerImpl (#5306) 64b727e is described below commit 64b727e105117d806b6e680ec5f2b0f9bbce0afd Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Fri Jun 26 07:47:23 2020 -0700 GEODE-8195: ConcurrentModificationException from LocatorMembershipListenerImpl (#5306) I've replaced the "for" loop using an implicit Iterator with one using an explicit Iterator so that its safe "remove()" method can be used. The Iterator method is stated as being the only safe way to modify the collection while iterating over its contents. I've also modified a test to validate the fix. The test forces a failure to send two messages to an address. The failures are then handled in the code that was throwing the ConcurrentModificationException and, since there are two failures, it causes two removals to be performedon the failedMessages collection. (cherry picked from commit 3cda1b1a213f2195ff0b97361883f6a6c3972b14) --- .../locator/wan/LocatorMembershipListenerImpl.java | 14 ++-- .../locator/wan/LocatorMembershipListenerTest.java | 88 ++++++++++++++++------ 2 files changed, 72 insertions(+), 30 deletions(-) diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java index acb4f0a..253f6dc 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java @@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal.locator.wan; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -295,15 +296,15 @@ public class LocatorMembershipListenerImpl implements LocatorMembershipListener public void run() { Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new HashMap<>(); for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()) { - for (DistributionLocatorId value : entry.getValue()) { + for (DistributionLocatorId remoteLocator : entry.getValue()) { // Notify known remote locator about the advertised locator. LocatorJoinMessage advertiseNewLocatorMessage = new LocatorJoinMessage( joiningLocatorDistributedSystemId, joiningLocator, localLocatorId, ""); - sendMessage(value, advertiseNewLocatorMessage, failedMessages); + sendMessage(remoteLocator, advertiseNewLocatorMessage, failedMessages); // Notify the advertised locator about remote known locator. LocatorJoinMessage advertiseKnownLocatorMessage = - new LocatorJoinMessage(entry.getKey(), value, localLocatorId, ""); + new LocatorJoinMessage(entry.getKey(), remoteLocator, localLocatorId, ""); sendMessage(joiningLocator, advertiseKnownLocatorMessage, failedMessages); } } @@ -317,9 +318,11 @@ public class LocatorMembershipListenerImpl implements LocatorMembershipListener DistributionLocatorId targetLocator = entry.getKey(); Set<LocatorJoinMessage> joinMessages = entry.getValue(); - for (LocatorJoinMessage locatorJoinMessage : joinMessages) { + for (Iterator<LocatorJoinMessage> iterator = joinMessages.iterator(); iterator + .hasNext();) { + LocatorJoinMessage locatorJoinMessage = iterator.next(); if (retryMessage(targetLocator, locatorJoinMessage, attempt)) { - joinMessages.remove(locatorJoinMessage); + iterator.remove(); } else { // Sleep between retries. try { @@ -328,6 +331,7 @@ public class LocatorMembershipListenerImpl implements LocatorMembershipListener Thread.currentThread().interrupt(); logger.warn( "Locator Membership listener permanently failed to exchange locator information due to interruption."); + return; } } } diff --git a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java index d282128..2869d4d 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -46,7 +47,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.SystemOutRule; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.tcpserver.HostAndPort; @@ -55,9 +58,13 @@ import org.apache.geode.internal.admin.remote.DistributionLocatorId; import org.apache.geode.test.junit.ResultCaptor; public class LocatorMembershipListenerTest { + public static final int TIMEOUT = 500; private TcpClient tcpClient; private LocatorMembershipListenerImpl locatorMembershipListener; + @Rule + public SystemOutRule systemOutRule = new SystemOutRule(); + private DistributionLocatorId buildDistributionLocatorId(int port) { return new DistributionLocatorId("localhost[" + port + "]"); } @@ -86,10 +93,10 @@ public class LocatorMembershipListenerTest { throws IOException, ClassNotFoundException { verify(tcpClient).requestToServer(initialTargetLocator.getHost(), new LocatorJoinMessage(advertisedLocatorDsId, advertisedLocator, sourceLocator, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); verify(tcpClient).requestToServer(advertisedLocator.getHost(), new LocatorJoinMessage(initialTargetLocatorDsId, initialTargetLocator, sourceLocator, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); } private void joinLocatorsDistributorThread(ResultCaptor<Thread> resultCaptor) { @@ -103,7 +110,7 @@ public class LocatorMembershipListenerTest { DistributionConfig distributionConfig = mock(DistributionConfig.class); when(distributionConfig.getStartLocator()).thenReturn(DistributionConfig.DEFAULT_START_LOCATOR); when(distributionConfig.getMemberTimeout()) - .thenReturn(DistributionConfig.DEFAULT_MEMBER_TIMEOUT); + .thenReturn(TIMEOUT); tcpClient = mock(TcpClient.class); locatorMembershipListener = spy(new LocatorMembershipListenerImpl(tcpClient)); @@ -275,7 +282,7 @@ public class LocatorMembershipListenerTest { when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo); when(tcpClient.requestToServer(locator3Site1.getHost(), new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false)) + TIMEOUT, false)) .thenThrow(new EOFException("Mock Exception")); ResultCaptor<Thread> resultCaptor = new ResultCaptor<>(); @@ -287,39 +294,70 @@ public class LocatorMembershipListenerTest { verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 1)).requestToServer( locator3Site1.getHost(), new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); verify(tcpClient).requestToServer(joiningLocator.getHost(), new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); } @Test public void locatorJoinedShouldNotRetryAgainAfterSuccessfulRetryOnConnectionFailures() throws IOException, ClassNotFoundException { + systemOutRule.enableLog(); ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new ConcurrentHashMap<>(); + DistributionLocatorId localLocatorID = buildDistributionLocatorId(10101); DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102); - DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101); - DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103); - allLocatorsInfo.put(1, new HashSet<>(Collections.singletonList(locator3Site1))); + DistributionLocatorId remoteLocator1 = buildDistributionLocatorId(10103); + DistributionLocatorId remoteLocator2 = buildDistributionLocatorId(10104); + final HashSet<DistributionLocatorId> remoteLocators = + new HashSet<>(Arrays.asList(new DistributionLocatorId[] {remoteLocator1, remoteLocator2})); + allLocatorsInfo.put(1, remoteLocators); when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo); - when(tcpClient.requestToServer(locator3Site1.getHost(), - new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false)) + // have messaging fail twice so that LocatorMembershipListenerImpl's retryMessage logic is + // exercised + when(tcpClient.requestToServer(remoteLocator1.getHost(), + new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""), + TIMEOUT, false)) + .thenThrow(new EOFException("Test Exception")) + .thenThrow(new EOFException("Test Exception")) + .thenReturn(null); + when(tcpClient.requestToServer(remoteLocator2.getHost(), + new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""), + TIMEOUT, false)) + .thenThrow(new EOFException("Test Exception")) + .thenThrow(new EOFException("Test Exception")) + .thenReturn(null); + when(tcpClient.requestToServer(joiningLocator.getHost(), + new LocatorJoinMessage(1, remoteLocator1, localLocatorID, ""), + TIMEOUT, false)) + .thenThrow(new EOFException("Test Exception")) + .thenThrow(new EOFException("Test Exception")) + .thenReturn(null); + // also have the joining locator fail to receive messages so we can test that code path. + // It will fail to receive messages informing it of remoteLocator1 and remoteLocator2, so it + // will have + // two failed messages to retry. The others will each have one message to retry, informing + // them about the joiningLocator. + when(tcpClient.requestToServer(joiningLocator.getHost(), + new LocatorJoinMessage(1, remoteLocator2, localLocatorID, ""), + TIMEOUT, false)) + .thenThrow(new EOFException("Mock Exception")) .thenThrow(new EOFException("Mock Exception")) .thenReturn(null); ResultCaptor<Thread> resultCaptor = new ResultCaptor<>(); doAnswer(resultCaptor).when(locatorMembershipListener).buildLocatorsDistributorThread( any(DistributionLocatorId.class), anyMap(), any(DistributionLocatorId.class), anyInt()); - locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1); + locatorMembershipListener.locatorJoined(1, joiningLocator, localLocatorID); joinLocatorsDistributorThread(resultCaptor); - verify(tcpClient, times(2)).requestToServer(locator3Site1.getHost(), - new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); - verify(tcpClient).requestToServer(joiningLocator.getHost(), - new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + assertThat(systemOutRule.getLog()).doesNotContain("ConcurrentModificationException"); + + // The sendMessage loop in the listener will try to send 4 messages. Two to the remoteLocators + // and two to the joiningLocator. The retry loop will try to send the messages again and + // fail (4 more messages) and then it will succeed (4 more messages, for a total of 12). + verify(tcpClient, times(12)).requestToServer(isA(HostAndPort.class), + isA(LocatorJoinMessage.class), isA(Integer.class), isA(Boolean.class)); } @Test @@ -339,14 +377,14 @@ public class LocatorMembershipListenerTest { // Fail on first 2 attempts and succeed on third attempt. when(tcpClient.requestToServer(locator3Site1.getHost(), new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false)) + TIMEOUT, false)) .thenThrow(new EOFException("Mock Exception")) .thenThrow(new EOFException("Mock Exception")).thenReturn(null); // Fail always. when(tcpClient.requestToServer(joiningLocator.getHost(), new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false)) + TIMEOUT, false)) .thenThrow(new EOFException("Mock Exception")); ResultCaptor<Thread> resultCaptor = new ResultCaptor<>(); @@ -358,17 +396,17 @@ public class LocatorMembershipListenerTest { verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2, locator1Site2); verify(tcpClient, times(3)).requestToServer(locator3Site1.getHost(), new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); verify(tcpClient).requestToServer(joiningLocator.getHost(), new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); verify(tcpClient).requestToServer(locator1Site3.getHost(), new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 1)).requestToServer( joiningLocator.getHost(), new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""), - DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false); + TIMEOUT, false); } private static class HandlerCallable implements Callable<Object> {