Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 7f4defd09 -> 761e43d30


http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 6b01aef..1b70747 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -55,11 +56,15 @@ import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -68,6 +73,7 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -79,6 +85,7 @@ import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
 /**
  * TODO IGNITE-3478: extend tests to use single/mutiple nodes, all tx types.
  * TODO IGNITE-3478: test with cache groups.
+ * TODO IGNITE-3478: add check for cleanup in all test (at the and do update 
for all keys, check there are 2 versions left).
  */
 @SuppressWarnings("unchecked")
 public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
@@ -89,6 +96,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     private static final int DFLT_PARTITION_COUNT = 
RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
 
     /** */
+    private static final String CRD_ATTR = "testCrd";
+
+    /** */
     private static final long DFLT_TEST_TIME = 30_000;
 
     /** */
@@ -100,6 +110,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /** */
     private boolean testSpi;
 
+    /** */
+    private String nodeAttr;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -115,6 +128,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         cfg.setClientMode(client);
 
+        if (nodeAttr != null)
+            cfg.setUserAttributes(F.asMap(nodeAttr, true));
+
         return cfg;
     }
 
@@ -124,6 +140,13 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         try {
             verifyCoordinatorInternalState();
@@ -131,6 +154,10 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         finally {
             stopAllGrids();
         }
+
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
+
+        super.afterTest();
     }
 
     /**
@@ -491,7 +518,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         client = true;
 
-        final Ignite ignite = startGrid(3);
+        final Ignite ignite = startGrid(2);
 
         awaitPartitionMapExchange();
 
@@ -549,7 +576,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 if (i % 2 == 1) {
                     // Execute one more update to increase counter.
                     try (Transaction tx = 
ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                        cache.put(1000_0000, 1);
+                        cache.put(primaryKeys(jcache(0), 1, 100_000).get(0), 
1);
 
                         tx.commit();
                     }
@@ -956,38 +983,46 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_SingleNode() throws Exception {
-        putAllGetAll(1, 0, 0, 64);
+        putAllGetAll(false, 1, 0, 0, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception 
{
-        putAllGetAll(1, 0, 0, 1);
+        putAllGetAll(false, 1, 0, 0, 1);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups0() throws Exception {
-        putAllGetAll(4, 2, 0, 64);
+        putAllGetAll(false, 4, 2, 0, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups1() throws Exception {
-        putAllGetAll(4, 2, 1, 64);
+        putAllGetAll(false, 4, 2, 1, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups2() throws Exception {
-        putAllGetAll(4, 2, 2, 64);
+        putAllGetAll(false, 4, 2, 2, 64);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() 
throws Exception {
+        putAllGetAll(true, 4, 2, 1, 64);
     }
 
     /**
+     * @param restartCrd Coordinator restart flag.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -995,6 +1030,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void putAllGetAll(
+        boolean restartCrd,
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -1118,7 +1154,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            restartCrd,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -1128,6 +1166,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             null,
             writer,
             reader);
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
     }
 
     /**
@@ -1349,7 +1390,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            false,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -1486,7 +1529,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            false,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -1535,6 +1580,8 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         stopGrid(1);
 
+        checkActiveQueriesCleanup(ignite(0));
+
         verifyCoordinatorInternalState();
 
         try {
@@ -1671,43 +1718,79 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * TODO IGNITE-3478.
-     *
      * @throws Exception If failed.
      */
-    public void _testReadInProgressCoordinatorFails() throws Exception {
+    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws 
Exception {
+        for (int i = 1; i <= 3; i++) {
+            readInProgressCoordinatorFailsSimple(false, i);
+
+            afterTest();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFailsSimple_FromClient() throws 
Exception {
+        for (int i = 1; i <= 3; i++) {
+            readInProgressCoordinatorFailsSimple(true, i);
+
+            afterTest();
+        }
+    }
+
+    /**
+     * @param fromClient {@code True} if read from client node, otherwise from 
server node.
+     * @param crdChangeCnt Number of coordinator changes.
+     * @throws Exception If failed.
+     */
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient, int 
crdChangeCnt) throws Exception {
+        info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient 
+ ", crdChangeCnt=" + crdChangeCnt + ']');
+
         testSpi = true;
 
-        startGrids(4);
+        client = false;
+
+        final int SRVS = 3;
+        final int COORDS = crdChangeCnt + 1;
+
+        startGrids(SRVS + COORDS);
 
         client = true;
 
-        final Ignite client = startGrid(4);
+        assertTrue(startGrid(SRVS + COORDS).configuration().isClientMode());
 
-        final IgniteCache cache = 
client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
-            setNodeFilter(new 
TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), 
getTestIgniteInstanceName(1))));
+        final Ignite getNode = fromClient ? ignite(SRVS + COORDS) : 
ignite(COORDS);
+
+        String[] excludeNodes = new String[COORDS];
+
+        for (int i = 0; i < COORDS; i++)
+            excludeNodes[i] = testNodeName(i);
+
+        final IgniteCache cache = 
getNode.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
+            setNodeFilter(new TestCacheNodeExcludingFilter(excludeNodes)));
 
         final Set<Integer> keys = new HashSet<>();
 
-        List<Integer> keys1 = primaryKeys(jcache(2), 10);
+        List<Integer> keys1 = primaryKeys(jcache(COORDS), 10);
 
         keys.addAll(keys1);
-        keys.addAll(primaryKeys(jcache(3), 10));
+        keys.addAll(primaryKeys(jcache(COORDS + 1), 10));
 
         Map<Integer, Integer> vals = new HashMap();
 
         for (Integer key : keys)
             vals.put(key, -1);
 
-        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+        try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
             cache.putAll(vals);
 
             tx.commit();
         }
 
-        final TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(client);
+        final TestRecordingCommunicationSpi getNodeSpi = 
TestRecordingCommunicationSpi.spi(getNode);
 
-        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+        getNodeSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() 
{
             @Override public boolean apply(ClusterNode node, Message msg) {
                 return msg instanceof GridNearGetRequest;
             }
@@ -1734,25 +1817,154 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             }
         }, "get-thread");
 
-        clientSpi.waitForBlocked();
+        getNodeSpi.waitForBlocked();
 
-        final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new 
Callable() {
-            @Override public Object call() throws Exception {
-                Thread.sleep(3000);
+        for (int i = 0; i < crdChangeCnt; i++)
+            stopGrid(i);
 
-                clientSpi.stopBlock(true);
+        for (int i = 0; i < 10; i++) {
+            vals = new HashMap();
+
+            for (Integer key : keys)
+                vals.put(key, i);
+
+            try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.putAll(vals);
+
+                tx.commit();
+            }
+        }
+
+        getNodeSpi.stopBlock(true);
+
+        getFut.get();
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorChangeActiveQueryClientFails_Simple() throws 
Exception {
+        testSpi = true;
+
+        client = false;
+
+        final int SRVS = 3;
+        final int COORDS = 1;
+
+        startGrids(SRVS + COORDS);
+
+        client = true;
+
+        Ignite client = startGrid(SRVS + COORDS);
+
+        final IgniteCache cache = 
client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
+            setNodeFilter(new TestCacheNodeExcludingFilter(testNodeName(0))));
+
+        final Map<Integer, Integer> vals = new HashMap();
+
+        for (int i = 0; i < 100; i++)
+            vals.put(i, i);
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            cache.putAll(vals);
+
+            tx.commit();
+        }
+
+        final TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(client);
+
+        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridNearGetRequest;
+            }
+        });
+
+        IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                cache.getAll(vals.keySet());
 
                 return null;
             }
         }, "get-thread");
 
+        clientSpi.waitForBlocked();
+
         stopGrid(0);
 
-        for (int i = 0; i < 10; i++) {
-            vals = new HashMap();
+        stopGrid(client.name());
 
-            for (Integer key : keys)
-                vals.put(key, i);
+        try {
+            getFut.get();
+
+            fail();
+        }
+        catch (Exception ignore) {
+            // No-op.
+        }
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFails() throws Exception {
+        readInProgressCoordinatorFails(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFails_ReadDelay() throws 
Exception {
+        readInProgressCoordinatorFails(true);
+    }
+
+    /**
+     * @param readDelay {@code True} if delays get requests.
+     * @throws Exception If failed.
+     */
+    private void readInProgressCoordinatorFails(boolean readDelay) throws 
Exception {
+        final int COORD_NODES = 5;
+        final int SRV_NODES = 4;
+
+        if (readDelay)
+            testSpi = true;
+
+        startGrids(COORD_NODES);
+
+        startGridsMultiThreaded(COORD_NODES, SRV_NODES);
+
+        client = true;
+
+        Ignite client = startGrid(COORD_NODES + SRV_NODES);
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        final int KEYS = 100;
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 0; i < KEYS; i++)
+            vals.put(i, 0);
+
+        String[] exclude = new String[COORD_NODES];
+
+        for (int i = 0; i < COORD_NODES; i++)
+            exclude[i] = testNodeName(i);
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ccfg.setName("cache-" + cacheNames.size());
+
+            // First server nodes are 'dedicated' coordinators.
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
+
+            cacheNames.add(ccfg.getName());
+
+            IgniteCache cache = client.createCache(ccfg);
 
             try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
                 cache.putAll(vals);
@@ -1761,8 +1973,372 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             }
         }
 
-        releaseWaitFut.get();
-        getFut.get();
+        if (readDelay) {
+            for (int i = COORD_NODES; i < COORD_NODES + SRV_NODES + 1; i++) {
+                TestRecordingCommunicationSpi.spi(ignite(i)).closure(new 
IgniteBiInClosure<ClusterNode, Message>() {
+                    @Override public void apply(ClusterNode node, Message msg) 
{
+                        if (msg instanceof GridNearGetRequest)
+                            doSleep(ThreadLocalRandom.current().nextLong(50) + 
1);
+                    }
+                });
+            }
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            final AtomicInteger readNodeIdx = new AtomicInteger(0);
+
+            IgniteInternalFuture getFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try {
+                        Ignite node = ignite(COORD_NODES + 
(readNodeIdx.getAndIncrement() % (SRV_NODES + 1)));
+
+                        int cnt = 0;
+
+                        while (!done.get()) {
+                            for (String cacheName : cacheNames) {
+                                IgniteCache cache = node.cache(cacheName);
+
+                                Map<Integer, Integer> res = 
cache.getAll(vals.keySet());
+
+                                assertEquals(vals.size(), res.size());
+
+                                Integer val0 = null;
+
+                                for (Integer val : res.values()) {
+                                    if (val0 == null)
+                                        val0 = val;
+                                    else
+                                        assertEquals(val0, val);
+                                }
+                            }
+
+                            cnt++;
+                        }
+
+                        log.info("Finished [node=" + node.name() + ", 
readCnt=" + cnt + ']');
+
+                        return null;
+                    }
+                    catch (Throwable e) {
+                        error("Unexpected error: " + e, e);
+
+                        throw e;
+                    }
+                }
+            }, (SRV_NODES + 1) + 1, "get-thread");
+
+            IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new 
Callable() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(COORD_NODES);
+
+                    List<IgniteCache> caches = new ArrayList<>();
+
+                    for (String cacheName : cacheNames)
+                        caches.add(node.cache(cacheName));
+
+                    Integer val = 1;
+
+                    while (!done.get()) {
+                        Map<Integer, Integer> vals = new HashMap<>();
+
+                        for (int i = 0; i < KEYS; i++)
+                            vals.put(i, val);
+
+                        for (IgniteCache cache : caches) {
+                            try {
+                                try (Transaction tx = 
node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                    cache.putAll(vals);
+
+                                    tx.commit();
+                                }
+                            }
+                            catch (ClusterTopologyException e) {
+                                info("Tx failed: " + e);
+                            }
+                        }
+
+                        val++;
+                    }
+
+                    return null;
+                }
+            }, "putAll-thread");
+
+            IgniteInternalFuture putFut2 = GridTestUtils.runAsync(new 
Callable() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(COORD_NODES);
+
+                    IgniteCache cache = node.cache(cacheNames.get(0));
+
+                    Integer val = 0;
+
+                    while (!done.get()) {
+                        try {
+                            try (Transaction tx = 
node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                cache.put(Integer.MAX_VALUE, val);
+
+                                tx.commit();
+                            }
+                        }
+                        catch (ClusterTopologyException e) {
+                            info("Tx failed: " + e);
+                        }
+
+                        val++;
+                    }
+
+                    return null;
+                }
+            }, "put-thread");
+
+            for (int i = 0; i < COORD_NODES && !getFut.isDone(); i++) {
+                U.sleep(3000);
+
+                stopGrid(i);
+
+                awaitPartitionMapExchange();
+            }
+
+            done.set(true);
+
+            getFut.get();
+            putFut1.get();
+            putFut2.get();
+
+            for (Ignite node : G.allGrids())
+                checkActiveQueriesCleanup(node);
+        }
+        finally {
+            done.set(true);
+        }
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMvccCoordinatorChangeSimple() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ccfg.setName("cache-" + cacheNames.size());
+
+            cacheNames.add(ccfg.getName());
+
+            srv0.createCache(ccfg);
+        }
+
+        checkPutGet(cacheNames);
+
+        for (int i = 0; i < 3; i++) {
+            startGrid(i + 1);
+
+            checkPutGet(cacheNames);
+
+            checkCoordinatorsConsistency(null);
+        }
+
+        client = true;
+
+        for (int i = 0; i < 3; i++) {
+            Ignite node = startGrid(i + 4);
+
+            // Init client caches outside of transactions.
+            for (String cacheName : cacheNames)
+                node.cache(cacheName);
+
+            checkPutGet(cacheNames);
+
+            checkCoordinatorsConsistency(null);
+        }
+
+        for (int i = 0; i < 3; i++) {
+            stopGrid(i);
+
+            awaitPartitionMapExchange();
+
+            checkPutGet(cacheNames);
+
+            checkCoordinatorsConsistency(null);
+        }
+    }
+
+    /**
+     * @param cacheNames Cache names.
+     */
+    private void checkPutGet(List<String> cacheNames) {
+        List<Ignite> nodes = G.allGrids();
+
+        assertFalse(nodes.isEmpty());
+
+        Ignite putNode = 
nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+
+        Map<Integer, Integer> vals = new HashMap();
+
+        Integer val = ThreadLocalRandom.current().nextInt();
+
+        for (int i = 0; i < 10; i++)
+            vals.put(i, val);
+
+        try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            for (String cacheName : cacheNames)
+                putNode.cache(cacheName).putAll(vals);
+
+            tx.commit();
+        }
+
+        for (Ignite node : nodes) {
+            for (String cacheName : cacheNames) {
+                Map<Object, Object> res = 
node.cache(cacheName).getAll(vals.keySet());
+
+                assertEquals(vals, res);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMvccCoordinatorInfoConsistency() throws Exception {
+        for (int i = 0; i < 4; i++) {
+            startGrid(i);
+
+            checkCoordinatorsConsistency(i + 1);
+        }
+
+        client = true;
+
+        startGrid(4);
+
+        checkCoordinatorsConsistency(5);
+
+        startGrid(5);
+
+        checkCoordinatorsConsistency(6);
+
+        client = false;
+
+        stopGrid(0);
+
+        checkCoordinatorsConsistency(5);
+    }
+
+    /**
+     * @param expNodes Expected nodes number.
+     */
+    private void checkCoordinatorsConsistency(@Nullable Integer expNodes) {
+        List<Ignite> nodes = G.allGrids();
+
+        if (expNodes != null)
+            assertEquals(expNodes, (Integer)nodes.size());
+
+        MvccCoordinator crd = null;
+
+        for (Ignite node : G.allGrids()) {
+            CacheCoordinatorsProcessor crdProc = ((IgniteKernal) 
node).context().cache().context().coordinators();
+
+            MvccCoordinator crd0 = crdProc.currentCoordinator();
+
+            if (crd != null)
+                assertEquals(crd, crd0);
+            else
+                crd = crd0;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetVersionRequestFailover() throws Exception {
+        final int NODES = 5;
+
+        testSpi = true;
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        Ignite client = startGrid(NODES - 1);
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            vals.put(i, i);
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ccfg.setName("cache-" + cacheNames.size());
+
+            ccfg.setNodeFilter(new 
TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+            cacheNames.add(ccfg.getName());
+
+            IgniteCache cache = client.createCache(ccfg);
+
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.putAll(vals);
+
+                tx.commit();
+            }
+        }
+
+        final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            IgniteInternalFuture getFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(nodeIdx.getAndIncrement());
+
+                    int cnt = 0;
+
+                    while (!done.get()) {
+                        for (String cacheName : cacheNames) {
+                            IgniteCache cache = node.cache(cacheName);
+
+                            Map<Integer, Integer> res = 
cache.getAll(vals.keySet());
+
+                            assertEquals(vals, res);
+                        }
+
+                        cnt++;
+                    }
+
+                    log.info("Finished [node=" + node.name() + ", cnt=" + cnt 
+ ']');
+
+                    return null;
+                }
+            }, NODES - 1, "get-thread");
+
+            doSleep(1000);
+
+            TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(ignite(0));
+
+            crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() 
{
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return msg instanceof MvccCoordinatorVersionResponse;
+                }
+            });
+
+            crdSpi.waitForBlocked();
+
+            stopGrid(0);
+
+            doSleep(1000);
+
+            done.set(true);
+
+            getFut.get();
+        }
+        finally {
+            done.set(true);
+        }
     }
 
     /**
@@ -1899,7 +2475,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            false,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -1925,6 +2503,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void readWriteTest(
+        final boolean restartCrd,
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -1935,18 +2514,36 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         IgniteInClosure<IgniteCache<Object, Object>> init,
         final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer,
         final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> 
reader) throws Exception {
+        if (restartCrd)
+            CacheCoordinatorsProcessor.coordinatorAssignClosure(new 
CoordinatorAssignClosure());
+
         Ignite srv0 = startGridsMultiThreaded(srvs);
 
         if (clients > 0) {
             client = true;
 
             startGridsMultiThreaded(srvs, clients);
+
+            client = false;
         }
 
-        IgniteCache<Object, Object> cache = 
srv0.createCache(cacheConfiguration(PARTITIONED,
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
             FULL_SYNC,
             cacheBackups,
-            cacheParts));
+            cacheParts);
+
+        if (restartCrd)
+            ccfg.setNodeFilter(new CoordinatorNodeFilter());
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        int crdIdx = srvs + clients;
+
+        if (restartCrd) {
+            nodeAttr = CRD_ATTR;
+
+            startGrid(crdIdx);
+        }
 
         if (init != null)
             init.apply(cache);
@@ -1974,6 +2571,12 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                         writer.apply(idx, caches, stop);
                     }
                     catch (Throwable e) {
+                        if (restartCrd && X.hasCause(e, 
ClusterTopologyException.class)) {
+                            log.info("Writer error: " + e);
+
+                            return null;
+                        }
+
                         error("Unexpected error: " + e, e);
 
                         stop.set(true);
@@ -2006,9 +2609,24 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             }, readers, "reader");
 
-            while (System.currentTimeMillis() < stopTime && !stop.get())
+            while (System.currentTimeMillis() < stopTime && !stop.get()) {
                 Thread.sleep(1000);
 
+                if (restartCrd) {
+                    log.info("Start new coordinator: " + (crdIdx + 1));
+
+                    startGrid(crdIdx + 1);
+
+                    log.info("Stop current coordinator: " + crdIdx);
+
+                    stopGrid(crdIdx);
+
+                    crdIdx++;
+
+                    awaitPartitionMapExchange();
+                }
+            }
+
             stop.set(true);
 
             writeFut.get();
@@ -2095,7 +2713,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      */
     private void verifyCoordinatorInternalState() throws Exception {
         for (Ignite node : G.allGrids()) {
-            final CacheCoordinatorsSharedManager crd = 
((IgniteKernal)node).context().cache().context().coordinators();
+            final CacheCoordinatorsProcessor crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
             Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
 
@@ -2119,16 +2737,49 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void checkActiveQueriesCleanup(Ignite node) throws Exception {
-        final CacheCoordinatorsSharedManager crd = 
((IgniteKernal)node).context().cache().context().coordinators();
+        final CacheCoordinatorsProcessor crd = 
((IgniteKernal)node).context().cache().context().coordinators();
+
+        assertTrue("Active queries not cleared: " + node.name(), 
GridTestUtils.waitForCondition(
+            new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Object activeQueries = GridTestUtils.getFieldValue(crd, 
"activeQueries");
+
+                    synchronized (activeQueries) {
+                        Long minQry = 
GridTestUtils.getFieldValue(activeQueries, "minQry");
+
+                        if (minQry != null)
+                            log.info("Min query: " + minQry);
+
+                        Map<Object, Map> queriesMap = 
GridTestUtils.getFieldValue(activeQueries, "activeQueries");
+
+                        boolean empty = true;
+
+                        for (Map.Entry<Object, Map> e : queriesMap.entrySet()) 
{
+                            if (!e.getValue().isEmpty()) {
+                                empty = false;
 
-        assertTrue(GridTestUtils.waitForCondition(
+                                log.info("Active queries: " + e);
+                            }
+                        }
+
+                        return empty && minQry == null;
+                    }
+                }
+            }, 8_000)
+        );
+
+        assertTrue("Previous coordinator queries not empty: " + node.name(), 
GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    Map activeQrys = GridTestUtils.getFieldValue(crd, 
"activeQueries");
+                    Map queries = GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "activeQueries");
+                    Boolean prevDone = GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "prevQueriesDone");
 
-                    return activeQrys.isEmpty();
+                    if (!queries.isEmpty() || !prevDone)
+                        log.info("Previous coordinator state [prevDone=" + 
prevDone + ", queries=" + queries + ']');
+
+                    return queries.isEmpty();
                 }
-            }, 5000)
+            }, 8_000)
         );
     }
 
@@ -2203,4 +2854,31 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         /** */
         SCAN
     }
+
+    /**
+     *
+     */
+    static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> 
{
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.attribute(CRD_ATTR) == null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class CoordinatorAssignClosure implements 
IgniteClosure<Collection<ClusterNode>, ClusterNode> {
+        @Override public ClusterNode apply(Collection<ClusterNode> 
clusterNodes) {
+            for (ClusterNode node : clusterNodes) {
+                if (node.attribute(CRD_ATTR) != null) {
+                    assert !CU.clientNode(node) : node;
+
+                    return node;
+                }
+            }
+
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 64070d1..56d09f8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -51,7 +51,6 @@ public class BPlusTreePageMemoryImplTest extends 
BPlusTreeSelfTest {
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 5bbf575..39183b2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -52,7 +52,6 @@ public class BPlusTreeReuseListPageMemoryImplTest extends 
BPlusTreeReuseSelfTest
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
index d16e525..a427c63 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
@@ -67,7 +67,6 @@ public class MetadataStoragePageMemoryImplTest extends 
MetadataStorageSelfTest{
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index bd849b1..467ede4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -56,7 +56,6 @@ public class PageMemoryImplNoLoadTest extends 
PageMemoryNoLoadSelfTest {
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 37422fb..c5997fa 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -79,7 +79,6 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index ee43309..6a1d4f4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -38,7 +38,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
 import 
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager;
@@ -65,7 +64,6 @@ public class GridCacheTestContext<K, V> extends 
GridCacheContext<K, V> {
             ctx,
             new GridCacheSharedContext<>(
                 ctx,
-                new CacheCoordinatorsSharedManager(),
                 new IgniteTxManager(),
                 new GridCacheVersionManager(),
                 new GridCacheMvccManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 4965d16..094d14c 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1453,6 +1453,14 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param idx Index of the Ignite instance.
+     * @return Indexed Ignite instance name.
+     */
+    protected String testNodeName(int idx) {
+        return getTestIgniteInstanceName(idx);
+    }
+
+    /**
      * Parses test Ignite instance index from test Ignite instance name.
      *
      * @param testIgniteInstanceName Test Ignite instance name, returned by 
{@link #getTestIgniteInstanceName(int)}.

Reply via email to