Repository: ignite Updated Branches: refs/heads/ignite-3479 6266d76ea -> eb141c6ea
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3927aa10 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3927aa10 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3927aa10 Branch: refs/heads/ignite-3479 Commit: 3927aa105eb3342dbbe002c067f3e53493d4ed80 Parents: 7607029 Author: sboikov <[email protected]> Authored: Thu Sep 28 15:07:32 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 28 15:07:32 2017 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 4 +- .../GridNearPessimisticTxPrepareFuture.java | 16 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 50 ++++-- .../cache/mvcc/CacheMvccTransactionsTest.java | 175 ++++++++++++++++++- .../testframework/junits/GridAbstractTest.java | 8 + 5 files changed, 224 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index c1f6672..8f03911 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1679,7 +1679,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ClusterNode node = ctx.discovery().node(nodeId); if (node == null) - throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + throw new ClusterTopologyCheckedException("Failed to send message to node (has node left grid?): " + nodeId); sendToCustomTopic(node, topic, msg, plc); } @@ -1697,7 +1697,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ClusterNode node = ctx.discovery().node(nodeId); if (node == null) - throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + throw new ClusterTopologyCheckedException("Failed to send message to node (has node left grid?): " + nodeId); send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index dbfea8b..2001011 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -81,17 +81,19 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA boolean found = false; for (IgniteInternalFuture<?> fut : futures()) { - MiniFuture f = (MiniFuture)fut; + if (fut instanceof MiniFuture) { + MiniFuture f = (MiniFuture)fut; - if (f.primary().id().equals(nodeId)) { - ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + - nodeId); + if (f.primary().id().equals(nodeId)) { + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); - e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - f.onNodeLeft(e); + f.onNodeLeft(e); - found = true; + found = true; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index b50b0a4..68b7f15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -58,8 +58,11 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; +import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; @@ -179,31 +182,40 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param topVer Topology version. */ public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { - MvccCoordinator crd = discoData.coordinator(); + if (evtType == EVT_NODE_METRICS_UPDATED) + return; - if (crd == null || - ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { - ClusterNode crdNode = null; + MvccCoordinator crd; - // Expect nodes are sorted by order. - for (ClusterNode node : nodes) { - if (!CU.clientNode(node)) { - crdNode = node; + if (evtType == EVT_NODE_SEGMENTED || evtType == EVT_CLIENT_NODE_DISCONNECTED) + crd = null; + else { + crd = discoData.coordinator(); - break; - } - } + if (crd == null || + ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { + ClusterNode crdNode = null; - crd = crdNode != null ? new - MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null; + // Expect nodes are sorted by order. + for (ClusterNode node : nodes) { + if (!CU.clientNode(node)) { + crdNode = node; - if (crd != null) - log.info("Assigned mvcc coordinator: " + crd); - else - U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); + break; + } + } + + crd = crdNode != null ? new + MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null; - discoData = new CacheCoordinatorsDiscoveryData(crd); + if (crd != null) + log.info("Assigned mvcc coordinator: " + crd); + else + U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); + } } + + discoData = new CacheCoordinatorsDiscoveryData(crd); } /** @@ -1040,6 +1052,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param crdId Coordinator node ID. */ WaitAckFuture(long id, UUID crdId, boolean ackTx) { + assert crdId != null; + this.id = id; this.crdId = crdId; this.ackTx = ackTx; http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 35c9011..bec2725 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1676,14 +1676,14 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testReadInProgressCoordinatorFails_FromServer() throws Exception { + public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception { readInProgressCoordinatorFails(false); } /** * @throws Exception If failed. */ - public void testReadInProgressCoordinatorFails_FromClient() throws Exception { + public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception { readInProgressCoordinatorFails(true); } @@ -1789,6 +1789,177 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testReadInProgressCoordinatorFails() throws Exception { + startGrids(3); + + startGridsMultiThreaded(3, 4); + + client = true; + + Ignite client = startGrid(7); + + final List<String> cacheNames = new ArrayList<>(); + + final int KEYS = 100; + + final Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < KEYS; i++) + vals.put(i, 0); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + ccfg.setName("cache-" + cacheNames.size()); + + // First 3 server nodes are 'dedicated' coordinators. + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter( + testNodeName(0), testNodeName(1), testNodeName(2))); + + cacheNames.add(ccfg.getName()); + + IgniteCache cache = client.createCache(ccfg); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final AtomicInteger readNodeIdx = new AtomicInteger(0); + + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + Ignite node = ignite(3 + (readNodeIdx.getAndIncrement() % 5)); + + int cnt = 0; + + while (!done.get()) { + for (String cacheName : cacheNames) { + IgniteCache cache = node.cache(cacheName); + + Map<Integer, Integer> res = cache.getAll(vals.keySet()); + + assertEquals(vals.size(), res.size()); + + Integer val0 = null; + + for (Integer val : res.values()) { + if (val0 == null) + val0 = val; + else + assertEquals(val0, val); + } + } + + cnt++; + } + + log.info("Finished [node=" + node.name() + ", readCnt=" + cnt + ']'); + + return null; + } + catch (Throwable e) { + error("Unexpected error: " + e, e); + + throw e; + } + } + }, 10, "get-thread"); + + IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + Ignite node = ignite(3); + + List<IgniteCache> caches = new ArrayList<>(); + + for (String cacheName : cacheNames) + caches.add(node.cache(cacheName)); + + Integer val = 1; + + while (!done.get()) { + Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < KEYS; i++) + vals.put(i, val); + + for (IgniteCache cache : caches) { + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + catch (ClusterTopologyException e) { + info("Tx failed: " + e); + } + } + + val++; + } + + return null; + } + }, "putAll-thread"); + + IgniteInternalFuture putFut2 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + Ignite node = ignite(3); + + IgniteCache cache = node.cache(cacheNames.get(0)); + + Integer val = 0; + + while (!done.get()) { + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(Integer.MAX_VALUE, val); + + tx.commit(); + } + } + catch (ClusterTopologyException e) { + info("Tx failed: " + e); + } + + val++; + } + + return null; + } + }, "put-thread"); + + for (int i = 0; i < 3 && !getFut.isDone(); i++) { + U.sleep(3000); + + stopGrid(i); + + awaitPartitionMapExchange(); + } + + done.set(true); + + getFut.get(); + putFut1.get(); + putFut2.get(); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + finally { + done.set(true); + } + + } + + /** + * @throws Exception If failed. + */ public void testMvccCoordinatorChangeSimple() throws Exception { Ignite srv0 = startGrid(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/3927aa10/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 4965d16..0dcd65e 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1453,6 +1453,14 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @param idx Index of the Ignite instance. + * @return Indexed Ignite instance name. + */ + public String testNodeName(int idx) { + return getTestIgniteInstanceName(idx); + } + + /** * Parses test Ignite instance index from test Ignite instance name. * * @param testIgniteInstanceName Test Ignite instance name, returned by {@link #getTestIgniteInstanceName(int)}.
