This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6b05d7f IGNITE-12902 Fix concurrent modification exception when
iterating over exchange events - Fixes #7675.
6b05d7f is described below
commit 6b05d7f2cc9cc669a0b7f9ded06d1138c111317b
Author: ktkalenko <[email protected]>
AuthorDate: Mon Apr 27 19:18:00 2020 +0300
IGNITE-12902 Fix concurrent modification exception when iterating over
exchange events - Fixes #7675.
Signed-off-by: Alexey Goncharuk <[email protected]>
---
.../processors/cache/ExchangeDiscoveryEvents.java | 19 ++--
.../dht/topology/GridClientPartitionTopology.java | 6 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 8 +-
.../cache/distributed/CacheExchangeMergeTest.java | 104 ++++++++++++++++++++-
4 files changed, 113 insertions(+), 24 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index a0c2dec..bf7b4ac 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -18,9 +18,10 @@
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
@@ -58,13 +59,13 @@ public class ExchangeDiscoveryEvents {
private DiscoveryEvent lastSrvEvt;
/** All events. */
- private List<DiscoveryEvent> evts = Collections.synchronizedList(new
ArrayList<>());
+ private Collection<DiscoveryEvent> evts = new ConcurrentLinkedQueue<>();
/** Joined server nodes. */
- private List<ClusterNode> joinedSrvNodes =
Collections.synchronizedList(new ArrayList<>());
+ private Collection<ClusterNode> joinedSrvNodes = new
ConcurrentLinkedQueue<>();
/** Left server nodes. */
- private List<ClusterNode> leftSrvNodes = Collections.synchronizedList(new
ArrayList<>());
+ private Collection<ClusterNode> leftSrvNodes = new
ConcurrentLinkedQueue<>();
/**
* @param fut Current exchange future.
@@ -86,9 +87,7 @@ public class ExchangeDiscoveryEvents {
* @return {@code True} if has join event for give node.
*/
public boolean nodeJoined(UUID nodeId) {
- for (int i = 0; i < evts.size(); i++) {
- DiscoveryEvent evt = evts.get(i);
-
+ for (DiscoveryEvent evt : evts) {
if (evt.type() == EVT_NODE_JOINED &&
nodeId.equals(evt.eventNode().id()))
return true;
}
@@ -136,7 +135,7 @@ public class ExchangeDiscoveryEvents {
/**
* @return All events.
*/
- public List<DiscoveryEvent> events() {
+ public Collection<DiscoveryEvent> events() {
return evts;
}
@@ -194,14 +193,14 @@ public class ExchangeDiscoveryEvents {
/**
*
*/
- public List<ClusterNode> joinedServerNodes() {
+ public Collection<ClusterNode> joinedServerNodes() {
return joinedSrvNodes;
}
/**
*
*/
- public List<ClusterNode> leftServerNodes() {
+ public Collection<ClusterNode> leftServerNodes() {
return leftSrvNodes;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 7a988a5..e2fdf3c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -355,11 +355,7 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
if (exchFut.context().events().hasServerLeft()) {
- List<DiscoveryEvent> evts0 = exchFut.context().events().events();
-
- for (int i = 0; i < evts0.size(); i++) {
- DiscoveryEvent evt = evts0.get(i);
-
+ for (DiscoveryEvent evt : exchFut.context().events().events()) {
if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
removeNode(evt.eventNode().id());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index ac7a5b6..1a94737 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -568,11 +568,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
boolean grpStarted =
exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
if (evts.hasServerLeft()) {
- List<DiscoveryEvent> evts0 = evts.events();
-
- for (int i = 0; i < evts0.size(); i++) {
- DiscoveryEvent evt = evts0.get(i);
-
+ for (DiscoveryEvent evt : evts.events()) {
if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
removeNode(evt.eventNode().id());
}
@@ -2276,7 +2272,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
GridDhtPartitionsExchangeFuture exchFut) {
Map<UUID, Set<Integer>> res = new HashMap<>();
- List<DiscoveryEvent> evts = exchFut.events().events();
+ Collection<DiscoveryEvent> evts = exchFut.events().events();
Set<UUID> joinedNodes = U.newHashSet(evts.size());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index bc9a9c3..2308399 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -18,18 +18,22 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -47,11 +51,13 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.TestDelayingCommunicationSpi;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
@@ -68,6 +74,8 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -76,6 +84,7 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import static java.util.Objects.nonNull;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -85,6 +94,8 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static
org.apache.ignite.testframework.GridTestUtils.mergeExchangeWaitVersion;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.LogListener.matches;
/**
*
@@ -111,9 +122,12 @@ public class CacheExchangeMergeTest extends
GridCommonAbstractTest {
/** */
private static ExecutorService executor;
+ /** Logger for listen messages. */
+ private final ListeningTestLogger listeningLog = new
ListeningTestLogger(false, log);
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ IgniteConfiguration cfg =
super.getConfiguration(igniteInstanceName).setGridLogger(listeningLog);
if (testSpi)
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
@@ -161,6 +175,8 @@ public class CacheExchangeMergeTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ listeningLog.clearListeners();
+
stopAllGrids();
super.afterTest();
@@ -1577,10 +1593,92 @@ public class CacheExchangeMergeTest extends
GridCommonAbstractTest {
static class TestDelayExchangeMessagesSpi extends
TestDelayingCommunicationSpi {
/** {@inheritDoc} */
@Override protected boolean delayMessage(Message msg, GridIoMessage
ioMsg) {
- if (msg instanceof GridDhtPartitionsAbstractMessage)
- return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() !=
null || (msg instanceof GridDhtPartitionsSingleRequest);
+ return delay(msg);
+ }
+ }
+ /**
+ * Return {@code true} if need to delay message to emulate merge exchanges.
+ *
+ * @param msg Message.
+ * @return {@code True} if need to delay message.
+ */
+ private static boolean delay(Message msg) {
+ if (!GridDhtPartitionsAbstractMessage.class.isInstance(msg))
return false;
+
+ GridDhtPartitionsAbstractMessage dhtMsg =
(GridDhtPartitionsAbstractMessage)msg;
+ return nonNull(dhtMsg.exchangeId()) ||
GridDhtPartitionsSingleRequest.class.isInstance(dhtMsg);
+ }
+
+ /**
+ * Test checks that there will be no {@link
ConcurrentModificationException}
+ * when merging exchanges and iterating over {@link
ExchangeDiscoveryEvents#events} at the same time.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoConcurrentModificationExceptionAfterMergeExchanges()
throws Exception {
+ testSpi = true;
+
+ LogListener logLsnr = matches("Merge exchange future on finish
[").build();
+ listeningLog.registerAllListeners(logLsnr);
+
+ AtomicBoolean stop = new AtomicBoolean();
+ Collection<Exception> exceptions = new ConcurrentLinkedQueue<>();
+
+ try {
+ startGrid(0);
+
+ for (int i = 1; i < 9; i++) {
+ IgniteConfiguration cfg =
getConfiguration(getTestIgniteInstanceName(i));
+ TestRecordingCommunicationSpi spi =
((TestRecordingCommunicationSpi)cfg.getCommunicationSpi());
+
+ spi.blockMessages((node, msg) -> delay(msg));
+ runAsync(() -> startGrid(cfg), "create-node-" +
cfg.getIgniteInstanceName());
+ spi.waitForBlocked();
+ }
+
+ List<Ignite> allNodes = IgnitionEx.allGridsx();
+ CountDownLatch latch = new CountDownLatch(allNodes.size());
+
+ for (Ignite gridEx : allNodes) {
+ runAsync(() -> {
+ Collection<DiscoveryEvent> evts =
((IgniteEx)gridEx).context().cache().context().exchange()
+ .lastTopologyFuture().events().events();
+
+ latch.countDown();
+
+ int i = 0;
+ while (!stop.get()) {
+ try {
+ for (DiscoveryEvent evt : evts) {
+ if (nonNull(evt))
+ i++;
+ }
+ }
+ catch (ConcurrentModificationException e) {
+ exceptions.add(e);
+
+ log.error("i = " + i, e);
+
+ break;
+ }
+ }
+ }, "get-ex-" + gridEx.configuration().getIgniteInstanceName());
+ }
+
+ for (Ignite node : allNodes)
+ TestRecordingCommunicationSpi.spi(node).stopBlock();
+
+ latch.await();
+ awaitPartitionMapExchange();
}
+ finally {
+ stop.set(true);
+ }
+
+ assertTrue(logLsnr.check());
+ assertTrue(exceptions.isEmpty());
}
}