Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 f4f21b729 -> d86ab340d


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d86ab340
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d86ab340
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d86ab340

Branch: refs/heads/ignite-3479
Commit: d86ab340d11f27a59f97b7755a6d5f7ec5daaae7
Parents: f4f21b7
Author: sboikov <[email protected]>
Authored: Fri Sep 29 14:12:56 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 29 14:12:56 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  30 +++-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 136 +++++++++++++++++--
 2 files changed, 149 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d86ab340/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 753ee33..ac55164 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -125,6 +126,9 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     /** */
     private CacheCoordinatorsDiscoveryData discoData = new 
CacheCoordinatorsDiscoveryData(null);
 
+    /** For tests only. */
+    private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC;
+
     /**
      * @param ctx Context.
      */
@@ -178,6 +182,15 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * For testing only.
+     *
+     * @param crdC Closure assigning coordinator.
+     */
+    static void 
coordinatorAssignClosure(IgniteClosure<Collection<ClusterNode>, ClusterNode> 
crdC) {
+        CacheCoordinatorsProcessor.crdC = crdC;
+    }
+
+    /**
      * @param evtType Event type.
      * @param nodes Current nodes.
      * @param topVer Topology version.
@@ -197,12 +210,19 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
                 ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && 
!F.nodeIds(nodes).contains(crd.nodeId()))) {
                 ClusterNode crdNode = null;
 
-                // Expect nodes are sorted by order.
-                for (ClusterNode node : nodes) {
-                    if (!CU.clientNode(node)) {
-                        crdNode = node;
+                if (crdC != null) {
+                    crdNode = crdC.apply(nodes);
 
-                        break;
+                    log.info("Assigned coordinator using test closure: " + 
crd);
+                }
+                else {
+                    // Expect nodes are sorted by order.
+                    for (ClusterNode node : nodes) {
+                        if (!CU.clientNode(node)) {
+                            crdNode = node;
+
+                            break;
+                        }
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d86ab340/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index ac16833..1b70747 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -55,11 +56,15 @@ import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -91,6 +96,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     private static final int DFLT_PARTITION_COUNT = 
RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
 
     /** */
+    private static final String CRD_ATTR = "testCrd";
+
+    /** */
     private static final long DFLT_TEST_TIME = 30_000;
 
     /** */
@@ -102,6 +110,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /** */
     private boolean testSpi;
 
+    /** */
+    private String nodeAttr;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -117,6 +128,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         cfg.setClientMode(client);
 
+        if (nodeAttr != null)
+            cfg.setUserAttributes(F.asMap(nodeAttr, true));
+
         return cfg;
     }
 
@@ -126,6 +140,13 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         try {
             verifyCoordinatorInternalState();
@@ -133,6 +154,10 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         finally {
             stopAllGrids();
         }
+
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
+
+        super.afterTest();
     }
 
     /**
@@ -958,38 +983,46 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_SingleNode() throws Exception {
-        putAllGetAll(1, 0, 0, 64);
+        putAllGetAll(false, 1, 0, 0, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception 
{
-        putAllGetAll(1, 0, 0, 1);
+        putAllGetAll(false, 1, 0, 0, 1);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups0() throws Exception {
-        putAllGetAll(4, 2, 0, 64);
+        putAllGetAll(false, 4, 2, 0, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups1() throws Exception {
-        putAllGetAll(4, 2, 1, 64);
+        putAllGetAll(false, 4, 2, 1, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups2() throws Exception {
-        putAllGetAll(4, 2, 2, 64);
+        putAllGetAll(false, 4, 2, 2, 64);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() 
throws Exception {
+        putAllGetAll(true, 4, 2, 1, 64);
     }
 
     /**
+     * @param restartCrd Coordinator restart flag.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -997,6 +1030,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void putAllGetAll(
+        boolean restartCrd,
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -1120,7 +1154,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            restartCrd,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -1130,6 +1166,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             null,
             writer,
             reader);
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
     }
 
     /**
@@ -1351,7 +1390,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            false,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -1488,7 +1529,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            false,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -2432,7 +2475,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        readWriteTest(srvs,
+        readWriteTest(
+            false,
+            srvs,
             clients,
             cacheBackups,
             cacheParts,
@@ -2458,6 +2503,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void readWriteTest(
+        final boolean restartCrd,
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -2468,18 +2514,36 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         IgniteInClosure<IgniteCache<Object, Object>> init,
         final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer,
         final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> 
reader) throws Exception {
+        if (restartCrd)
+            CacheCoordinatorsProcessor.coordinatorAssignClosure(new 
CoordinatorAssignClosure());
+
         Ignite srv0 = startGridsMultiThreaded(srvs);
 
         if (clients > 0) {
             client = true;
 
             startGridsMultiThreaded(srvs, clients);
+
+            client = false;
         }
 
-        IgniteCache<Object, Object> cache = 
srv0.createCache(cacheConfiguration(PARTITIONED,
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
             FULL_SYNC,
             cacheBackups,
-            cacheParts));
+            cacheParts);
+
+        if (restartCrd)
+            ccfg.setNodeFilter(new CoordinatorNodeFilter());
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        int crdIdx = srvs + clients;
+
+        if (restartCrd) {
+            nodeAttr = CRD_ATTR;
+
+            startGrid(crdIdx);
+        }
 
         if (init != null)
             init.apply(cache);
@@ -2507,6 +2571,12 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                         writer.apply(idx, caches, stop);
                     }
                     catch (Throwable e) {
+                        if (restartCrd && X.hasCause(e, 
ClusterTopologyException.class)) {
+                            log.info("Writer error: " + e);
+
+                            return null;
+                        }
+
                         error("Unexpected error: " + e, e);
 
                         stop.set(true);
@@ -2539,9 +2609,24 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             }, readers, "reader");
 
-            while (System.currentTimeMillis() < stopTime && !stop.get())
+            while (System.currentTimeMillis() < stopTime && !stop.get()) {
                 Thread.sleep(1000);
 
+                if (restartCrd) {
+                    log.info("Start new coordinator: " + (crdIdx + 1));
+
+                    startGrid(crdIdx + 1);
+
+                    log.info("Stop current coordinator: " + crdIdx);
+
+                    stopGrid(crdIdx);
+
+                    crdIdx++;
+
+                    awaitPartitionMapExchange();
+                }
+            }
+
             stop.set(true);
 
             writeFut.get();
@@ -2769,4 +2854,31 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         /** */
         SCAN
     }
+
+    /**
+     *
+     */
+    static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> 
{
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.attribute(CRD_ATTR) == null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class CoordinatorAssignClosure implements 
IgniteClosure<Collection<ClusterNode>, ClusterNode> {
+        @Override public ClusterNode apply(Collection<ClusterNode> 
clusterNodes) {
+            for (ClusterNode node : clusterNodes) {
+                if (node.attribute(CRD_ATTR) != null) {
+                    assert !CU.clientNode(node) : node;
+
+                    return node;
+                }
+            }
+
+            return null;
+        }
+    }
 }

Reply via email to