Repository: ignite Updated Branches: refs/heads/ignite-5578 3488f9262 -> 8c80ef7b8
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c80ef7b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c80ef7b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c80ef7b Branch: refs/heads/ignite-5578 Commit: 8c80ef7b811b00f95f3198e1cc34563eaf9e2a62 Parents: 3488f92 Author: sboikov <[email protected]> Authored: Fri Jul 28 14:39:50 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Jul 28 16:20:40 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 26 +- .../processors/cache/ExchangeContext.java | 27 ++- .../processors/cache/GridCacheIoManager.java | 39 +-- .../dht/GridClientPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 5 +- .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 4 - .../cache/transactions/IgniteTxHandler.java | 13 +- .../CacheLateAffinityAssignmentTest.java | 243 +++++++++++-------- 10 files changed, 212 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 471f2ef..66924d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -448,20 +448,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); + grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); + + grpHolders.put(grp.groupId(), grpHolder); + GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId()); if (clientTop != null) { - grp.topology().update(topVer, + grp.topology().update(grpHolder.affinity().lastVersion(), clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet(), null); } - grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); - - grpHolders.put(grp.groupId(), grpHolder); - assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion()); } } @@ -1662,6 +1662,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.initialize(topVer, assign); } + + grpHolder.topology(fut).beforeExchange(fut, true); } else { List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures(); @@ -1673,14 +1675,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1); + assert prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0 : prev; + if (log.isDebugEnabled()) { log.debug("Need initialize affinity on coordinator [" + "cacheGrp=" + desc.cacheOrGroupName() + "prevAff=" + prev.topologyVersion() + ']'); } - assert prev.topologyVersion().compareTo(topVer) < 0 : prev; - GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, desc.groupId(), prev.topologyVersion(), @@ -1701,7 +1703,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); - affFut.onDone(fut.topologyVersion()); + affFut.onDone(topVer); } }); @@ -1714,13 +1716,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (newAff) { GridAffinityAssignmentCache aff = grpHolder.affinity(); - if (!aff.lastVersionEquals(fut.topologyVersion())) { - List<List<ClusterNode>> assign = aff.calculate(fut.topologyVersion(), + if (!aff.lastVersionEquals(topVer)) { + List<List<ClusterNode>> assign = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); - aff.initialize(fut.topologyVersion(), assign); + aff.initialize(topVer, assign); } + + grpHolder.topology(fut).beforeExchange(fut, true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 70c896e..1a5fbb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.HashSet; import java.util.Set; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; @@ -32,6 +33,9 @@ import static org.apache.ignite.internal.processors.cache.GridCachePartitionExch */ public class ExchangeContext { /** */ + public static final String IGNITE_EXCHANGE_COMPATIBILITY_MODE = "IGNITE_EXCHANGE_COMPATIBILITY_MODE"; + + /** */ private Set<Integer> requestGrpsAffOnJoin; /** */ @@ -43,22 +47,35 @@ public class ExchangeContext { /** */ private final ExchangeDiscoveryEvents evts; + /** */ + private final boolean compatibilityNode = IgniteSystemProperties.getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_MODE, false); + /** * @param fut Exchange future. */ public ExchangeContext(GridDhtPartitionsExchangeFuture fut) { - int protocolVer = exchangeProtocolVersion( - fut.discoCache().minimumNodeVersion()); + int protocolVer = exchangeProtocolVersion(fut.discoCache().minimumNodeVersion()); - fetchAffOnJoin = protocolVer == 1; + if (compatibilityNode) { + fetchAffOnJoin = true; - merge = protocolVer > 1 && fut.discoveryEvent().type() != EVT_DISCOVERY_CUSTOM_EVT; + merge = false; + } + else { + fetchAffOnJoin = protocolVer == 1; + + merge = protocolVer > 1 && fut.discoveryEvent().type() != EVT_DISCOVERY_CUSTOM_EVT; + } evts = new ExchangeDiscoveryEvents(fut); } + /** + * @param node Node. + * @return {@code True} if node supports exchange merge protocol. + */ boolean supportsMergeExchanges(ClusterNode node) { - return exchangeProtocolVersion(node.version()) > 1; + return !compatibilityNode && exchangeProtocolVersion(node.version()) > 1; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 981c6e2..91872e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1051,27 +1051,34 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { throw e; } finally { - // Reset thread local context. - cctx.tm().resetContext(); + onMessageProcessed(msg); + } + } - GridCacheMvccManager mvcc = cctx.mvcc(); + /** + * @param msg Message. + */ + public void onMessageProcessed(GridCacheMessage msg) { + // Reset thread local context. + cctx.tm().resetContext(); - if (mvcc != null) - mvcc.contextReset(); + GridCacheMvccManager mvcc = cctx.mvcc(); - // Unwind eviction notifications. - if (msg instanceof IgniteTxStateAware) { - IgniteTxState txState = ((IgniteTxStateAware)msg).txState(); + if (mvcc != null) + mvcc.contextReset(); - if (txState != null) - txState.unwindEvicts(cctx); - } - else if (msg instanceof GridCacheIdMessage) { - GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()); + // Unwind eviction notifications. + if (msg instanceof IgniteTxStateAware) { + IgniteTxState txState = ((IgniteTxStateAware)msg).txState(); - if (ctx != null) - CU.unwindEvicts(ctx); - } + if (txState != null) + txState.unwindEvicts(cctx); + } + else if (msg instanceof GridCacheIdMessage) { + GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()); + + if (ctx != null) + CU.unwindEvicts(ctx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 975385f..37f4fd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -793,7 +793,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { GridDhtPartitionMap cur = node2part.get(parts.nodeId()); if (force) { - if (cur != null) + if (cur != null && cur.topologyVersion().initialized()) parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); } else if (isStaleUpdate(cur, parts)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 531aa51..aa2f0f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1498,7 +1498,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionMap cur = node2part.get(parts.nodeId()); if (force) { - if (cur != null) + if (cur != null && cur.topologyVersion().initialized()) parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); } else if (isStaleUpdate(cur, parts)) { @@ -1600,7 +1600,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - assert assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion()); + assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) || + assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion()); readyTopVer = lastTopChangeVer = assignment.topologyVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index daedda8..d39afb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -846,7 +846,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach top = topology(); - topology().readLock(); + top.readLock(); } try { http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 3e43e2b..3f97cae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1902,7 +1902,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { ctx.closures().runLocalWithThreadPolicy(thread, new Runnable() { @Override public void run() { - updateAllAsyncInternal0(node, req, completionCb); + updateAllAsyncInternal(node, req, completionCb); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1cd72e9..d5b7846 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2178,9 +2178,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grp.isLocal() || cacheGroupStopping(grp.groupId())) continue; - if (resTopVer.equals(new AffinityTopologyVersion(3, 0)) && "c1".equals(grp.cacheOrGroupName())) - System.out.println(); - grp.topology().beforeExchange(this, true); } @@ -2974,7 +2971,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert newCrdFut != null; - newCrdFut.init(GridDhtPartitionsExchangeFuture.this); newCrdFut.listen(new CI1<IgniteInternalFuture>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 753f8d8..6fc5e4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -25,7 +25,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -376,8 +375,11 @@ public class IgniteTxHandler { GridDhtTopologyFuture topFut = top.topologyVersionFuture(); - if (!topFut.isDone()) + if (!topFut.isDone()) { + top.readUnlock(); + return null; + } } try { @@ -549,7 +551,12 @@ public class IgniteTxHandler { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() { @Override public void run() { - processNearTxPrepareRequest0(node, req); + try { + processNearTxPrepareRequest0(node, req); + } + finally { + ctx.io().onMessageProcessed(req); + } } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 2350ed0..4bb7554 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -100,6 +100,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_MODE; /** * @@ -381,52 +382,59 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testAffinitySimpleNoCacheOnCoordinator2() throws Exception { - cacheC = new IgniteClosure<String, CacheConfiguration[]>() { - @Override public CacheConfiguration[] apply(String igniteInstanceName) { - if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) || - igniteInstanceName.equals(getTestIgniteInstanceName(2))) - return null; + System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true"); - return new CacheConfiguration[]{cacheConfiguration()}; - } - }; + try { + cacheC = new IgniteClosure<String, CacheConfiguration[]>() { + @Override public CacheConfiguration[] apply(String igniteInstanceName) { + if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) || + igniteInstanceName.equals(getTestIgniteInstanceName(2))) + return null; + + return new CacheConfiguration[]{cacheConfiguration()}; + } + }; - cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); - startServer(0, 1); - startServer(1, 2); - startServer(2, 3); - startServer(3, 4); + startServer(0, 1); + startServer(1, 2); + startServer(2, 3); + startServer(3, 4); - for (int i = 0; i < 4; i++) { - TestRecordingCommunicationSpi spi = - (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); + for (int i = 0; i < 4; i++) { + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); - // Prevent exchange finish while node0 or node1 is coordinator. - spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name()); - spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(1).name()); - } + // Prevent exchange finish while node0 or node1 is coordinator. + spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name()); + spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(1).name()); + } - stopGrid(0); - stopGrid(1); + stopGrid(0); + stopGrid(1); - calculateAffinity(5); - calculateAffinity(6); + calculateAffinity(5); + calculateAffinity(6); - checkAffinity(2, topVer(6, 0), true); + checkAffinity(2, topVer(6, 0), true); - assertNull(((IgniteKernal)ignite(2)).context().cache().internalCache(CACHE_NAME1)); - assertNotNull(((IgniteKernal)ignite(3)).context().cache().internalCache(CACHE_NAME1)); + assertNull(((IgniteKernal)ignite(2)).context().cache().internalCache(CACHE_NAME1)); + assertNotNull(((IgniteKernal)ignite(3)).context().cache().internalCache(CACHE_NAME1)); - assertNotNull(ignite(2).cache(CACHE_NAME1)); + assertNotNull(ignite(2).cache(CACHE_NAME1)); - checkAffinity(2, topVer(6, 0), true); + checkAffinity(2, topVer(6, 0), true); - startServer(4, 7); + startServer(4, 7); - checkAffinity(3, topVer(7, 0), false); + checkAffinity(3, topVer(7, 0), false); - checkAffinity(3, topVer(7, 1), true); + checkAffinity(3, topVer(7, 1), true); + } + finally { + System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE); + } } /** @@ -624,38 +632,45 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testNodeLeaveExchangeWaitAffinityMessage() throws Exception { - Ignite ignite0 = startServer(0, 1); + System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true"); - startServer(1, 2); + try { + Ignite ignite0 = startServer(0, 1); - startServer(2, 3); + startServer(1, 2); - checkAffinity(3, topVer(3, 1), true); + startServer(2, 3); - checkOrderCounters(3, topVer(3, 1)); + checkAffinity(3, topVer(3, 1), true); - startClient(3, 4); + checkOrderCounters(3, topVer(3, 1)); - checkAffinity(4, topVer(4, 0), true); + startClient(3, 4); - TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); + checkAffinity(4, topVer(4, 0), true); - discoSpi.blockCustomEvent(); + TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); - stopGrid(1); + discoSpi.blockCustomEvent(); - List<IgniteInternalFuture<?>> futs = affFutures(3, topVer(5, 0)); + stopGrid(1); - U.sleep(1000); + List<IgniteInternalFuture<?>> futs = affFutures(3, topVer(5, 0)); - for (IgniteInternalFuture<?> fut : futs) - assertFalse(fut.isDone()); + U.sleep(1000); - discoSpi.stopBlock(); + for (IgniteInternalFuture<?> fut : futs) + assertFalse(fut.isDone()); - checkAffinity(3, topVer(5, 0), false); + discoSpi.stopBlock(); - checkOrderCounters(3, topVer(5, 0)); + checkAffinity(3, topVer(5, 0), false); + + checkOrderCounters(3, topVer(5, 0)); + } + finally { + System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE); + } } /** @@ -1044,39 +1059,46 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void nodeLeftExchangeCoordinatorLeave(int nodes) throws Exception { - assert nodes > 2 : nodes; + System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true"); - long topVer = 0; + try { + assert nodes > 2 : nodes; - for (int i = 0; i < nodes; i++) - startServer(i, ++topVer); + long topVer = 0; - Ignite ignite1 = grid(1); + for (int i = 0; i < nodes; i++) + startServer(i, ++topVer); - checkAffinity(nodes, topVer(nodes, 1), true); + Ignite ignite1 = grid(1); - TestRecordingCommunicationSpi spi1 = - (TestRecordingCommunicationSpi)ignite1.configuration().getCommunicationSpi(); + checkAffinity(nodes, topVer(nodes, 1), true); - // Prevent exchange finish while node0 is coordinator. - spi1.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name()); + TestRecordingCommunicationSpi spi1 = + (TestRecordingCommunicationSpi)ignite1.configuration().getCommunicationSpi(); - stopNode(2, ++topVer); // New exchange started. + // Prevent exchange finish while node0 is coordinator. + spi1.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name()); - stopGrid(0); // Stop coordinator while exchange in progress. + stopNode(2, ++topVer); // New exchange started. - Map<String, List<List<ClusterNode>>> aff = checkAffinity(nodes - 2, topVer(topVer, 0), false); + stopGrid(0); // Stop coordinator while exchange in progress. - topVer++; + Map<String, List<List<ClusterNode>>> aff = checkAffinity(nodes - 2, topVer(topVer, 0), false); - boolean primaryChanged = calculateAffinity(nodes + 2, false, aff); + topVer++; - checkAffinity(nodes - 2, topVer(topVer, 0), !primaryChanged); + boolean primaryChanged = calculateAffinity(nodes + 2, false, aff); - if (primaryChanged) - checkAffinity(nodes - 2, topVer(topVer, 1), true); + checkAffinity(nodes - 2, topVer(topVer, 0), !primaryChanged); - awaitPartitionMapExchange(); + if (primaryChanged) + checkAffinity(nodes - 2, topVer(topVer, 1), true); + + awaitPartitionMapExchange(); + } + finally { + System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE); + } } /** @@ -1184,69 +1206,76 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDelayAssignmentAffinityChanged2() throws Exception { - Ignite ignite0 = startServer(0, 1); + System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true"); - TestTcpDiscoverySpi discoSpi0 = - (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); - TestRecordingCommunicationSpi commSpi0 = - (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + try { + Ignite ignite0 = startServer(0, 1); - startClient(1, 2); + TestTcpDiscoverySpi discoSpi0 = + (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); - checkAffinity(2, topVer(2, 0), true); + startClient(1, 2); - startServer(2, 3); + checkAffinity(2, topVer(2, 0), true); - checkAffinity(3, topVer(3, 1), false); + startServer(2, 3); - discoSpi0.blockCustomEvent(); + checkAffinity(3, topVer(3, 1), false); - stopNode(2, 4); + discoSpi0.blockCustomEvent(); - discoSpi0.waitCustomEvent(); + stopNode(2, 4); - blockSupplySend(commSpi0, CACHE_NAME1); + discoSpi0.waitCustomEvent(); - final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startServer(3, 5); + blockSupplySend(commSpi0, CACHE_NAME1); - return null; - } - }, 1, "server-starter"); + final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startServer(3, 5); - Thread.sleep(2_000); + return null; + } + }, 1, "server-starter"); - discoSpi0.stopBlock(); + Thread.sleep(2_000); - boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return startedFuture.isDone(); - } - }, 10_000); + discoSpi0.stopBlock(); - if (!started) - startedFuture.cancel(); + boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return startedFuture.isDone(); + } + }, 10_000); - assertTrue(started); + if (!started) + startedFuture.cancel(); - checkAffinity(3, topVer(5, 0), false); + assertTrue(started); - checkNoExchange(3, topVer(5, 1)); + checkAffinity(3, topVer(5, 0), false); - commSpi0.stopBlock(); + checkNoExchange(3, topVer(5, 1)); - checkAffinity(3, topVer(5, 1), true); + commSpi0.stopBlock(); - long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion(); + checkAffinity(3, topVer(5, 1), true); - assertEquals(5, nodeJoinTopVer); + long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion(); - List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures(); + assertEquals(5, nodeJoinTopVer); - for (GridDhtPartitionsExchangeFuture f : exFutures) { - //Shouldn't contains staled futures. - assertTrue(f.initialVersion().topologyVersion() >= nodeJoinTopVer); + List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures(); + + for (GridDhtPartitionsExchangeFuture f : exFutures) { + //Shouldn't contains staled futures. + assertTrue(f.initialVersion().topologyVersion() >= nodeJoinTopVer); + } + } + finally { + System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE); } }
