Repository: ignite Updated Branches: refs/heads/ignite-3479 d92cfa435 -> b201a9647
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b201a964 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b201a964 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b201a964 Branch: refs/heads/ignite-3479 Commit: b201a9647bb24692114a02aa2768dda844b82791 Parents: d92cfa4 Author: sboikov <[email protected]> Authored: Fri Sep 29 11:29:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 29 13:10:01 2017 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 3 + .../dht/GridPartitionedGetFuture.java | 6 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 2 +- .../processors/cache/mvcc/MvccQueryTracker.java | 34 ++++++--- .../cache/mvcc/PreviousCoordinatorQueries.java | 2 + .../cache/mvcc/CacheMvccTransactionsTest.java | 76 +++++++++++--------- .../testframework/junits/GridAbstractTest.java | 2 +- 7 files changed, 76 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 8f03911..adce492 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1659,6 +1659,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (e.getCause() instanceof ClusterTopologyCheckedException) throw (ClusterTopologyCheckedException)e.getCause(); + if (!ctx.discovery().alive(node)) + throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id()); + throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + "TCP connection cannot be established due to firewall issues) " + "[node=" + node + ", topic=" + topic + http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 4bfd0fe..68bc705 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -133,9 +133,11 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (!cctx.mvccEnabled()) return null; - assert mvccTracker != null; + MvccCoordinatorVersion ver = mvccTracker.mvccVersion(); - return mvccTracker.mvccVersion(); + assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]"; + + return ver; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index cfd6c4a..753ee33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -210,7 +210,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null; if (crd != null) - log.info("Assigned mvcc coordinator: " + crd); + log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']'); else U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java index a460820..360af4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java @@ -29,7 +29,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; /** - * + * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop */ public class MvccQueryTracker { /** */ @@ -75,12 +75,18 @@ public class MvccQueryTracker { @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { synchronized (this) { if (mvccVer != null) { - mvccCrd = newCrd; + assert mvccCrd != null : this; + + if (!mvccCrd.equals(newCrd)) { + mvccCrd = newCrd; // Need notify new coordinator. - return mvccVer; + return mvccVer; + } + else + return null; } else if (mvccCrd != null) - mvccCrd = null; + mvccCrd = null; // Mark for remap. return null; } @@ -100,7 +106,7 @@ public class MvccQueryTracker { mvccCrd0 = mvccCrd; mvccVer0 = mvccVer; - mvccVer = null; + mvccVer = null; // Mark as finished. } } @@ -134,12 +140,15 @@ public class MvccQueryTracker { return; } - else + else { waitNextTopology(topVer); + + return; + } } IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = - cctx.shared().coordinators().requestQueryCounter(mvccCrd); + cctx.shared().coordinators().requestQueryCounter(mvccCrd0); cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { @@ -151,8 +160,13 @@ public class MvccQueryTracker { boolean needRemap = false; synchronized (MvccQueryTracker.this) { - if (mvccCrd != null) + assert mvccVer == null : "[this=" + MvccQueryTracker.this + + ", ver=" + mvccVer + + ", rcvdVer=" + rcvdVer + "]"; + + if (mvccCrd != null) { mvccVer = rcvdVer; + } else needRemap = true; } @@ -167,7 +181,7 @@ public class MvccQueryTracker { IgniteLogger log = cctx.logger(MvccQueryTracker.class); if (log.isDebugEnabled()) - log.debug("Mvcc coordinator failed: " + e); + log.debug("Mvcc coordinator failed, need remap: " + e); } catch (IgniteCheckedException e) { lsnr.onMvccVersionError(e); @@ -175,7 +189,7 @@ public class MvccQueryTracker { return; } - // Coordinator failed on reassigned, need remap. + // Coordinator failed or reassigned, need remap. if (canRemap) waitNextTopology(topVer); else { http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java index 0810b0f..700b27d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java @@ -183,6 +183,8 @@ class PreviousCoordinatorQueries { prevQueriesDone = activeQueries.isEmpty(); } } + else + nodeQueries.put(cntr, newQryCnt); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 074c4f8..487f2d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1677,40 +1677,61 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception { - readInProgressCoordinatorFailsSimple(false); + for (int i = 1; i <= 3; i++) { + readInProgressCoordinatorFailsSimple(false, i); + + afterTest(); + } } /** * @throws Exception If failed. */ public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception { - readInProgressCoordinatorFailsSimple(true); + for (int i = 1; i <= 3; i++) { + readInProgressCoordinatorFailsSimple(true, i); + + afterTest(); + } } /** * @param fromClient {@code True} if read from client node, otherwise from server node. + * @param crdChangeCnt Number of coordinator changes. * @throws Exception If failed. */ - private void readInProgressCoordinatorFailsSimple(boolean fromClient) throws Exception { + private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt) throws Exception { + info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient + ", crdChangeCnt=" + crdChangeCnt + ']'); + testSpi = true; - startGrids(4); + client = false; + + final int SRVS = 3; + final int COORDS = crdChangeCnt + 1; + + startGrids(SRVS + COORDS); client = true; - assertTrue(startGrid(4).configuration().isClientMode()); + assertTrue(startGrid(SRVS + COORDS).configuration().isClientMode()); - final Ignite getNode = fromClient ? ignite(4) : ignite(1); + final Ignite getNode = fromClient ? ignite(SRVS + COORDS) : ignite(COORDS); + + String[] excludeNodes = new String[COORDS]; + + for (int i = 0; i < COORDS; i++) + excludeNodes[i] = testNodeName(i); final IgniteCache cache = getNode.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). - setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1)))); + setNodeFilter(new TestCacheNodeExcludingFilter(excludeNodes))); final Set<Integer> keys = new HashSet<>(); - List<Integer> keys1 = primaryKeys(jcache(2), 10); + List<Integer> keys1 = primaryKeys(jcache(COORDS), 10); keys.addAll(keys1); - keys.addAll(primaryKeys(jcache(3), 10)); + keys.addAll(primaryKeys(jcache(COORDS + 1), 10)); Map<Integer, Integer> vals = new HashMap(); @@ -1754,17 +1775,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { getNodeSpi.waitForBlocked(); - final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - Thread.sleep(3000); - - getNodeSpi.stopBlock(true); - - return null; - } - }, "stop-block"); - - stopGrid(0); + for (int i = 0; i < crdChangeCnt; i++) + stopGrid(i); for (int i = 0; i < 10; i++) { vals = new HashMap(); @@ -1779,7 +1791,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } } - releaseWaitFut.get(); + getNodeSpi.stopBlock(true); + getFut.get(); for (Ignite node : G.allGrids()) @@ -1792,6 +1805,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { public void testReadInProgressCoordinatorFails() throws Exception { readInProgressCoordinatorFails(false); } + /** * @throws Exception If failed. */ @@ -2573,7 +2587,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { private void checkActiveQueriesCleanup(Ignite node) throws Exception { final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators(); - assertTrue("Active queries not cleared", GridTestUtils.waitForCondition( + assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries"); @@ -2601,28 +2615,20 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }, 8_000) ); - assertTrue("Previous coordinator queries not empty", GridTestUtils.waitForCondition( + + assertTrue("Previous coordinator queries not empty: " + node.name(), GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries"); + Boolean prevDone = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone"); - if (!queries.isEmpty()) - log.info("Previous coordinator queries: " + queries); + if (!queries.isEmpty() || !prevDone) + log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']'); return queries.isEmpty(); } }, 8_000) ); - - if (node.cluster().localNode().id().equals(crd.currentCoordinatorId())) { - assertTrue("prevQueriesDone flag is not set", GridTestUtils.waitForCondition( - new GridAbsPredicate() { - @Override public boolean apply() { - return GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone"); - } - }, 8_000) - ); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 0dcd65e..094d14c 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1456,7 +1456,7 @@ public abstract class GridAbstractTest extends TestCase { * @param idx Index of the Ignite instance. * @return Indexed Ignite instance name. */ - public String testNodeName(int idx) { + protected String testNodeName(int idx) { return getTestIgniteInstanceName(idx); }
