This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b05d7f  IGNITE-12902 Fix concurrent modification exception when 
iterating over exchange events - Fixes #7675.
6b05d7f is described below

commit 6b05d7f2cc9cc669a0b7f9ded06d1138c111317b
Author: ktkalenko <[email protected]>
AuthorDate: Mon Apr 27 19:18:00 2020 +0300

    IGNITE-12902 Fix concurrent modification exception when iterating over 
exchange events - Fixes #7675.
    
    Signed-off-by: Alexey Goncharuk <[email protected]>
---
 .../processors/cache/ExchangeDiscoveryEvents.java  |  19 ++--
 .../dht/topology/GridClientPartitionTopology.java  |   6 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |   8 +-
 .../cache/distributed/CacheExchangeMergeTest.java  | 104 ++++++++++++++++++++-
 4 files changed, 113 insertions(+), 24 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index a0c2dec..bf7b4ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -18,9 +18,10 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
@@ -58,13 +59,13 @@ public class ExchangeDiscoveryEvents {
     private DiscoveryEvent lastSrvEvt;
 
     /** All events. */
-    private List<DiscoveryEvent> evts = Collections.synchronizedList(new 
ArrayList<>());
+    private Collection<DiscoveryEvent> evts = new ConcurrentLinkedQueue<>();
 
     /** Joined server nodes. */
-    private List<ClusterNode> joinedSrvNodes = 
Collections.synchronizedList(new ArrayList<>());
+    private Collection<ClusterNode> joinedSrvNodes = new 
ConcurrentLinkedQueue<>();
 
     /** Left server nodes. */
-    private List<ClusterNode> leftSrvNodes = Collections.synchronizedList(new 
ArrayList<>());
+    private Collection<ClusterNode> leftSrvNodes = new 
ConcurrentLinkedQueue<>();
 
     /**
      * @param fut Current exchange future.
@@ -86,9 +87,7 @@ public class ExchangeDiscoveryEvents {
      * @return {@code True} if has join event for give node.
      */
     public boolean nodeJoined(UUID nodeId) {
-        for (int i = 0; i < evts.size(); i++) {
-            DiscoveryEvent evt = evts.get(i);
-
+        for (DiscoveryEvent evt : evts) {
             if (evt.type() == EVT_NODE_JOINED && 
nodeId.equals(evt.eventNode().id()))
                 return true;
         }
@@ -136,7 +135,7 @@ public class ExchangeDiscoveryEvents {
     /**
      * @return All events.
      */
-    public List<DiscoveryEvent> events() {
+    public Collection<DiscoveryEvent> events() {
         return evts;
     }
 
@@ -194,14 +193,14 @@ public class ExchangeDiscoveryEvents {
     /**
      *
      */
-    public List<ClusterNode> joinedServerNodes() {
+    public Collection<ClusterNode> joinedServerNodes() {
         return joinedSrvNodes;
     }
 
     /**
      *
      */
-    public List<ClusterNode> leftServerNodes() {
+    public Collection<ClusterNode> leftServerNodes() {
         return leftSrvNodes;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 7a988a5..e2fdf3c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -355,11 +355,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
         if (exchFut.context().events().hasServerLeft()) {
-            List<DiscoveryEvent> evts0 = exchFut.context().events().events();
-
-            for (int i = 0; i < evts0.size(); i++) {
-                DiscoveryEvent evt = evts0.get(i);
-
+            for (DiscoveryEvent evt : exchFut.context().events().events()) {
                 if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
                     removeNode(evt.eventNode().id());
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index ac7a5b6..1a94737 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -568,11 +568,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 boolean grpStarted = 
exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
 
                 if (evts.hasServerLeft()) {
-                    List<DiscoveryEvent> evts0 = evts.events();
-
-                    for (int i = 0; i < evts0.size(); i++) {
-                        DiscoveryEvent evt = evts0.get(i);
-
+                    for (DiscoveryEvent evt : evts.events()) {
                         if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
                             removeNode(evt.eventNode().id());
                     }
@@ -2276,7 +2272,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         GridDhtPartitionsExchangeFuture exchFut) {
         Map<UUID, Set<Integer>> res = new HashMap<>();
 
-        List<DiscoveryEvent> evts = exchFut.events().events();
+        Collection<DiscoveryEvent> evts = exchFut.events().events();
 
         Set<UUID> joinedNodes = U.newHashSet(evts.size());
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index bc9a9c3..2308399 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -18,18 +18,22 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -47,11 +51,13 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.TestDelayingCommunicationSpi;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
@@ -68,6 +74,8 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -76,6 +84,7 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static java.util.Objects.nonNull;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -85,6 +94,8 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static 
org.apache.ignite.testframework.GridTestUtils.mergeExchangeWaitVersion;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.LogListener.matches;
 
 /**
  *
@@ -111,9 +122,12 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     /** */
     private static ExecutorService executor;
 
+    /** Logger for listen messages. */
+    private final ListeningTestLogger listeningLog = new 
ListeningTestLogger(false, log);
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        IgniteConfiguration cfg = 
super.getConfiguration(igniteInstanceName).setGridLogger(listeningLog);
 
         if (testSpi)
             cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
@@ -161,6 +175,8 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        listeningLog.clearListeners();
+
         stopAllGrids();
 
         super.afterTest();
@@ -1577,10 +1593,92 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     static class TestDelayExchangeMessagesSpi extends 
TestDelayingCommunicationSpi {
         /** {@inheritDoc} */
         @Override protected boolean delayMessage(Message msg, GridIoMessage 
ioMsg) {
-            if (msg instanceof GridDhtPartitionsAbstractMessage)
-                return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != 
null || (msg instanceof GridDhtPartitionsSingleRequest);
+            return delay(msg);
+        }
+    }
 
+    /**
+     * Return {@code true} if need to delay message to emulate merge exchanges.
+     *
+     * @param msg Message.
+     * @return {@code True} if need to delay message.
+     */
+    private static boolean delay(Message msg) {
+        if (!GridDhtPartitionsAbstractMessage.class.isInstance(msg))
             return false;
+
+        GridDhtPartitionsAbstractMessage dhtMsg = 
(GridDhtPartitionsAbstractMessage)msg;
+        return nonNull(dhtMsg.exchangeId()) || 
GridDhtPartitionsSingleRequest.class.isInstance(dhtMsg);
+    }
+
+    /**
+     * Test checks that there will be no {@link 
ConcurrentModificationException}
+     * when merging exchanges and iterating over {@link 
ExchangeDiscoveryEvents#events} at the same time.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNoConcurrentModificationExceptionAfterMergeExchanges() 
throws Exception {
+        testSpi = true;
+
+        LogListener logLsnr = matches("Merge exchange future on finish 
[").build();
+        listeningLog.registerAllListeners(logLsnr);
+
+        AtomicBoolean stop = new AtomicBoolean();
+        Collection<Exception> exceptions = new ConcurrentLinkedQueue<>();
+
+        try {
+            startGrid(0);
+
+            for (int i = 1; i < 9; i++) {
+                IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(i));
+                TestRecordingCommunicationSpi spi = 
((TestRecordingCommunicationSpi)cfg.getCommunicationSpi());
+
+                spi.blockMessages((node, msg) -> delay(msg));
+                runAsync(() -> startGrid(cfg), "create-node-" + 
cfg.getIgniteInstanceName());
+                spi.waitForBlocked();
+            }
+
+            List<Ignite> allNodes = IgnitionEx.allGridsx();
+            CountDownLatch latch = new CountDownLatch(allNodes.size());
+
+            for (Ignite gridEx : allNodes) {
+                runAsync(() -> {
+                    Collection<DiscoveryEvent> evts = 
((IgniteEx)gridEx).context().cache().context().exchange()
+                        .lastTopologyFuture().events().events();
+
+                    latch.countDown();
+
+                    int i = 0;
+                    while (!stop.get()) {
+                        try {
+                            for (DiscoveryEvent evt : evts) {
+                                if (nonNull(evt))
+                                    i++;
+                            }
+                        }
+                        catch (ConcurrentModificationException e) {
+                            exceptions.add(e);
+
+                            log.error("i = " + i, e);
+
+                            break;
+                        }
+                    }
+                }, "get-ex-" + gridEx.configuration().getIgniteInstanceName());
+            }
+
+            for (Ignite node : allNodes)
+                TestRecordingCommunicationSpi.spi(node).stopBlock();
+
+            latch.await();
+            awaitPartitionMapExchange();
         }
+        finally {
+            stop.set(true);
+        }
+
+        assertTrue(logLsnr.check());
+        assertTrue(exceptions.isEmpty());
     }
 }

Reply via email to