Repository: ignite Updated Branches: refs/heads/ignite-5578 25520bf97 -> 745395785
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74539578 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74539578 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74539578 Branch: refs/heads/ignite-5578 Commit: 7453957857b0089b39798e38df841fb4a25ca4b1 Parents: 25520bf Author: sboikov <[email protected]> Authored: Fri Aug 4 12:22:30 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Aug 4 13:37:28 2017 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 7 ++ .../distributed/dht/GridDhtTopologyFuture.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 6 +- .../distributed/CacheExchangeMergeTest.java | 86 +++++++++++++++++--- 4 files changed, 88 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 69d6a40..f921251 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.affinity; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -635,6 +636,12 @@ public class GridAffinityAssignmentCache { } } + /** + * @return All initialized versions. + */ + public Collection<AffinityTopologyVersion> cachedVersions() { + return affCache.keySet(); + } /** * Affinity ready future. Will remove itself from ready futures map. http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index cc12960..0bcc4a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -62,6 +62,8 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo * is completed before {@link GridFutureAdapter#onDone(Object, Throwable)} is called on * {@link GridDhtPartitionsExchangeFuture}, it is guaranteed that this method will return {@code true} * if affinity ready future is finished. + * <p> + * Also this method returns {@code false} for merged exchange futures. * * @return {@code True} if exchange is finished and result topology version can be used. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 67e41b3..30571ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -359,7 +359,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { - assert exchangeDone(); + /* + Should not be called before exchange is finished since result version can change in + case of merged exchanges. + */ + assert exchangeDone() : "Should not be called before exchange is finished"; return exchCtx.events().topologyVersion(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 3149385..fcb5276 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.TestDelayingCommunicationSpi; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; @@ -80,6 +81,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; /** * @@ -199,7 +201,6 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } // TODO IGNITE-5578 joined merged node failed (client/server). - // TODO IGNITE-5578 check exchanges/affinity consistency. /** * @throws Exception If failed. @@ -486,6 +487,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { fut2.get(); checkCaches(); + + checkExchanges(srv0, 1, 3); + checkExchanges(ignite(1), 3); + checkExchanges(ignite(2), 3); } /** @@ -531,7 +536,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { cfgCache = true; - IgniteInternalFuture fut = startGrids(srv0, 1, nodes); + IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes); fut.get(); @@ -556,7 +561,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { for (int i = 0; i < 3; i++) { mergeExchangeWaitVersion(srv0, topVer + 3); - startGrids(srv0, topVer, 3).get(); + startGridsAsync(srv0, topVer, 3).get(); topVer += 3; } @@ -605,7 +610,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testStartCacheOnJoinAndFailMerge() throws Exception { + public void testStartCacheOnJoinAndMergeWithFail() throws Exception { cfgCache = false; final Ignite srv0 = startGrids(2); @@ -614,13 +619,17 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { cfgCache = true; - IgniteInternalFuture fut = startGrids(srv0, 2, 2); + IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2); stopGrid(1); fut.get(); checkCaches(); + + checkExchanges(srv0, 1, 2, 3, 5); + checkExchanges(ignite(2), 3, 5); + checkExchanges(ignite(3), 5); } /** @@ -635,7 +644,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { cfgCache = true; - IgniteInternalFuture fut = startGrids(srv0, 2, 2); + IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2); stopGrid(0); @@ -656,7 +665,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { cfgCache = true; - IgniteInternalFuture fut = startGrids(srv0, 1, 2); + IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2); stopGrid(0); @@ -686,6 +695,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { fut.get(); checkCaches(); + + checkExchanges(srv0, 1, 3); + checkExchanges(ignite(1), 3); + checkExchanges(ignite(2), 3); } /** @@ -717,6 +730,12 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { fut.get(); checkCaches(); + + checkExchanges(srv0, 1, 2, 3, 5); + checkExchanges(ignite(1), 2, 3, 5); + checkExchanges(ignite(2), 3, 5); + checkExchanges(ignite(3), 5); + checkExchanges(ignite(4), 5); } /** @@ -727,7 +746,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { mergeExchangeWaitVersion(srv0, 6); - IgniteInternalFuture fut = startGrids(srv0, 3, 3); + IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3); fut.get(); @@ -812,7 +831,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { mergeExchangeWaitVersion(srv0, 12); - IgniteInternalFuture fut = startGrids(srv0, 6, 2); + IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2); fut.get(); @@ -903,7 +922,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { CountDownLatch latch = blockExchangeFinish(srvs, mode); - IgniteInternalFuture<?> fut = startGrids(srv0, srvs, 2); + IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2); if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) fail("Failed to wait for expected messages."); @@ -946,7 +965,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes); - IgniteInternalFuture<?> fut = startGrids(srv0, srvs, startNodes); + IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes); if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) fail("Failed to wait for expected messages."); @@ -1370,6 +1389,47 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @param node Node. + * @param vers Expected exchange versions. + */ + private void checkExchanges(Ignite node, long... vers) { + IgniteKernal node0 = (IgniteKernal)node; + + List<AffinityTopologyVersion> expVers = new ArrayList<>(); + + for (long ver : vers) + expVers.add(new AffinityTopologyVersion(ver)); + + List<AffinityTopologyVersion> doneVers = new ArrayList<>(); + + List<GridDhtPartitionsExchangeFuture> futs = + node0.context().cache().context().exchange().exchangeFutures(); + + for (int i = futs.size() - 1; i >= 0; i--) { + GridDhtPartitionsExchangeFuture fut = futs.get(i); + + if (fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) { + AffinityTopologyVersion resVer = fut.topologyVersion(); + + if (resVer != null) + doneVers.add(resVer); + } + } + + assertEquals(expVers, doneVers); + + for (CacheGroupContext grpCtx : node0.context().cache().cacheGroups()) { + for (AffinityTopologyVersion ver : grpCtx.affinity().cachedVersions()) { + if (ver.minorTopologyVersion() > 0) + continue; + + assertTrue("Unexpected version [ver=" + ver + ", exp=" + expVers + ']', + expVers.contains(ver)); + } + } + } + + /** + * @param node Node. * @param topVer Exchange version. * @throws Exception If failed. */ @@ -1386,13 +1446,15 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } /** + * Sequentially starts nodes so that node name is consistent with node order. + * * @param node Some existing node. * @param startIdx Start node index. * @param cnt Number of nodes. * @return Start future. * @throws Exception If failed. */ - private IgniteInternalFuture startGrids(Ignite node, int startIdx, int cnt) throws Exception { + private IgniteInternalFuture startGridsAsync(Ignite node, int startIdx, int cnt) throws Exception { GridCompoundFuture fut = new GridCompoundFuture(); for (int i = 0; i < cnt; i++) {
