Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 6266d76ea -> eb141c6ea


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3927aa10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3927aa10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3927aa10

Branch: refs/heads/ignite-3479
Commit: 3927aa105eb3342dbbe002c067f3e53493d4ed80
Parents: 7607029
Author: sboikov <[email protected]>
Authored: Thu Sep 28 15:07:32 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Sep 28 15:07:32 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |   4 +-
 .../GridNearPessimisticTxPrepareFuture.java     |  16 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  50 ++++--
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 175 ++++++++++++++++++-
 .../testframework/junits/GridAbstractTest.java  |   8 +
 5 files changed, 224 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c1f6672..8f03911 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1679,7 +1679,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
+            throw new ClusterTopologyCheckedException("Failed to send message 
to node (has node left grid?): " + nodeId);
 
         sendToCustomTopic(node, topic, msg, plc);
     }
@@ -1697,7 +1697,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
+            throw new ClusterTopologyCheckedException("Failed to send message 
to node (has node left grid?): " + nodeId);
 
         send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, 
false);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index dbfea8b..2001011 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -81,17 +81,19 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
         boolean found = false;
 
         for (IgniteInternalFuture<?> fut : futures()) {
-            MiniFuture f = (MiniFuture)fut;
+            if (fut instanceof MiniFuture) {
+                MiniFuture f = (MiniFuture)fut;
 
-            if (f.primary().id().equals(nodeId)) {
-                ClusterTopologyCheckedException e = new 
ClusterTopologyCheckedException("Remote node left grid: " +
-                    nodeId);
+                if (f.primary().id().equals(nodeId)) {
+                    ClusterTopologyCheckedException e = new 
ClusterTopologyCheckedException("Remote node left grid: " +
+                        nodeId);
 
-                
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+                    
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-                f.onNodeLeft(e);
+                    f.onNodeLeft(e);
 
-                found = true;
+                    found = true;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b50b0a4..68b7f15 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -58,8 +58,11 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
 
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
+import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 
@@ -179,31 +182,40 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
      * @param topVer Topology version.
      */
     public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, 
long topVer) {
-        MvccCoordinator crd = discoData.coordinator();
+        if (evtType == EVT_NODE_METRICS_UPDATED)
+            return;
 
-        if (crd == null ||
-            ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && 
!F.nodeIds(nodes).contains(crd.nodeId()))) {
-            ClusterNode crdNode = null;
+        MvccCoordinator crd;
 
-            // Expect nodes are sorted by order.
-            for (ClusterNode node : nodes) {
-                if (!CU.clientNode(node)) {
-                    crdNode = node;
+        if (evtType == EVT_NODE_SEGMENTED || evtType == 
EVT_CLIENT_NODE_DISCONNECTED)
+            crd = null;
+        else {
+            crd = discoData.coordinator();
 
-                    break;
-                }
-            }
+            if (crd == null ||
+                ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && 
!F.nodeIds(nodes).contains(crd.nodeId()))) {
+                ClusterNode crdNode = null;
 
-            crd = crdNode != null ? new
-                MvccCoordinator(crdNode.id(), topVer, new 
AffinityTopologyVersion(topVer, 0)) : null;
+                // Expect nodes are sorted by order.
+                for (ClusterNode node : nodes) {
+                    if (!CU.clientNode(node)) {
+                        crdNode = node;
 
-            if (crd != null)
-                log.info("Assigned mvcc coordinator: " + crd);
-            else
-                U.warn(log, "New mvcc coordinator was not assigned [topVer=" + 
topVer + ']');
+                        break;
+                    }
+                }
+
+                crd = crdNode != null ? new
+                    MvccCoordinator(crdNode.id(), topVer, new 
AffinityTopologyVersion(topVer, 0)) : null;
 
-            discoData = new CacheCoordinatorsDiscoveryData(crd);
+                if (crd != null)
+                    log.info("Assigned mvcc coordinator: " + crd);
+                else
+                    U.warn(log, "New mvcc coordinator was not assigned 
[topVer=" + topVer + ']');
+            }
         }
+
+        discoData = new CacheCoordinatorsDiscoveryData(crd);
     }
 
     /**
@@ -1040,6 +1052,8 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
          * @param crdId Coordinator node ID.
          */
         WaitAckFuture(long id, UUID crdId, boolean ackTx) {
+            assert crdId != null;
+
             this.id = id;
             this.crdId = crdId;
             this.ackTx = ackTx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 35c9011..bec2725 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1676,14 +1676,14 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testReadInProgressCoordinatorFails_FromServer() throws 
Exception {
+    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws 
Exception {
         readInProgressCoordinatorFails(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testReadInProgressCoordinatorFails_FromClient() throws 
Exception {
+    public void testReadInProgressCoordinatorFailsSimple_FromClient() throws 
Exception {
         readInProgressCoordinatorFails(true);
     }
 
@@ -1789,6 +1789,177 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testReadInProgressCoordinatorFails() throws Exception {
+        startGrids(3);
+
+        startGridsMultiThreaded(3, 4);
+
+        client = true;
+
+        Ignite client = startGrid(7);
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        final int KEYS = 100;
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 0; i < KEYS; i++)
+            vals.put(i, 0);
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ccfg.setName("cache-" + cacheNames.size());
+
+            // First 3 server nodes are 'dedicated' coordinators.
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(
+                testNodeName(0), testNodeName(1), testNodeName(2)));
+
+            cacheNames.add(ccfg.getName());
+
+            IgniteCache cache = client.createCache(ccfg);
+
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.putAll(vals);
+
+                tx.commit();
+            }
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            final AtomicInteger readNodeIdx = new AtomicInteger(0);
+
+            IgniteInternalFuture getFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try {
+                        Ignite node = ignite(3 + 
(readNodeIdx.getAndIncrement() % 5));
+
+                        int cnt = 0;
+
+                        while (!done.get()) {
+                            for (String cacheName : cacheNames) {
+                                IgniteCache cache = node.cache(cacheName);
+
+                                Map<Integer, Integer> res = 
cache.getAll(vals.keySet());
+
+                                assertEquals(vals.size(), res.size());
+
+                                Integer val0 = null;
+
+                                for (Integer val : res.values()) {
+                                    if (val0 == null)
+                                        val0 = val;
+                                    else
+                                        assertEquals(val0, val);
+                                }
+                            }
+
+                            cnt++;
+                        }
+
+                        log.info("Finished [node=" + node.name() + ", 
readCnt=" + cnt + ']');
+
+                        return null;
+                    }
+                    catch (Throwable e) {
+                        error("Unexpected error: " + e, e);
+
+                        throw e;
+                    }
+                }
+            }, 10, "get-thread");
+
+            IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new 
Callable() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(3);
+
+                    List<IgniteCache> caches = new ArrayList<>();
+
+                    for (String cacheName : cacheNames)
+                        caches.add(node.cache(cacheName));
+
+                    Integer val = 1;
+
+                    while (!done.get()) {
+                        Map<Integer, Integer> vals = new HashMap<>();
+
+                        for (int i = 0; i < KEYS; i++)
+                            vals.put(i, val);
+
+                        for (IgniteCache cache : caches) {
+                            try {
+                                try (Transaction tx = 
node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                    cache.putAll(vals);
+
+                                    tx.commit();
+                                }
+                            }
+                            catch (ClusterTopologyException e) {
+                                info("Tx failed: " + e);
+                            }
+                        }
+
+                        val++;
+                    }
+
+                    return null;
+                }
+            }, "putAll-thread");
+
+            IgniteInternalFuture putFut2 = GridTestUtils.runAsync(new 
Callable() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(3);
+
+                    IgniteCache cache = node.cache(cacheNames.get(0));
+
+                    Integer val = 0;
+
+                    while (!done.get()) {
+                        try {
+                            try (Transaction tx = 
node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                cache.put(Integer.MAX_VALUE, val);
+
+                                tx.commit();
+                            }
+                        }
+                        catch (ClusterTopologyException e) {
+                            info("Tx failed: " + e);
+                        }
+
+                        val++;
+                    }
+
+                    return null;
+                }
+            }, "put-thread");
+
+            for (int i = 0; i < 3 && !getFut.isDone(); i++) {
+                U.sleep(3000);
+
+                stopGrid(i);
+
+                awaitPartitionMapExchange();
+            }
+
+            done.set(true);
+
+            getFut.get();
+            putFut1.get();
+            putFut2.get();
+
+            for (Ignite node : G.allGrids())
+                checkActiveQueriesCleanup(node);
+        }
+        finally {
+            done.set(true);
+        }
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testMvccCoordinatorChangeSimple() throws Exception {
         Ignite srv0 = startGrid(0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 4965d16..0dcd65e 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1453,6 +1453,14 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param idx Index of the Ignite instance.
+     * @return Indexed Ignite instance name.
+     */
+    public String testNodeName(int idx) {
+        return getTestIgniteInstanceName(idx);
+    }
+
+    /**
      * Parses test Ignite instance index from test Ignite instance name.
      *
      * @param testIgniteInstanceName Test Ignite instance name, returned by 
{@link #getTestIgniteInstanceName(int)}.

Reply via email to