IGNITE-2801 Coordinator floods network with partitions full map exchange messages
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a3d7248 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a3d7248 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a3d7248 Branch: refs/heads/ignite-2004 Commit: 6a3d724805e231edca2d8d72891f15a8a729bbc2 Parents: 7f9ee2d Author: Anton Vinogradov <[email protected]> Authored: Tue Mar 29 14:56:21 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Mar 29 14:58:14 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 8 +- ...cingDelayedPartitionMapExchangeSelfTest.java | 14 +- .../GridCacheRebalancingSyncSelfTest.java | 186 ++++++++++++++++++- .../junits/common/GridCommonAbstractTest.java | 53 +++++- 4 files changed, 237 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 54580fd..6de10c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1264,13 +1264,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana break; } - // If not first preloading and no more topology events present, - // then we periodically refresh partition map. - if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) { - refreshPartitions(timeout); - + // If not first preloading and no more topology events present. + if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) timeout = cctx.gridConfig().getNetworkTimeout(); - } // After workers line up and before preloading starts we initialize all futures. if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java index 2890fcb..2c47a1c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.internal.U; @@ -99,7 +100,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri * @throws Exception e. */ public void test() throws Exception { - startGrid(0); + IgniteKernal ignite = (IgniteKernal)startGrid(0); CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(); @@ -114,26 +115,29 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri startGrid(2); startGrid(3); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); for (int i = 0; i < 2; i++) { stopGrid(3); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); startGrid(3); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); } startGrid(4); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); assert rs.isEmpty(); record = true; + // Emulate latest GridDhtPartitionsFullMessages. + ignite.context().cache().context().exchange().scheduleResendPartitions(); + while (rs.size() < 3) { // N - 1 nodes. U.sleep(10); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index e4ad66b..f1e5687 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -17,20 +17,40 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -38,6 +58,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheRebalanceMode.NONE; + /** * */ @@ -69,6 +92,12 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean concurrentStartFinished3; + /** */ + private volatile boolean record = false; + + /** */ + private final ConcurrentHashMap<Class, AtomicInteger> map = new ConcurrentHashMap<>(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(gridName); @@ -76,6 +105,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + TcpCommunicationSpi commSpi = new CountingCommunicationSpi(); + + commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + commSpi.setTcpNoDelay(true); + + iCfg.setCommunicationSpi(commSpi); + if (getTestGridName(10).equals(gridName)) iCfg.setClientMode(true); @@ -173,8 +209,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE + ", iteration=" + iter + ", cache=" + name + "]"); - assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) : - i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")"; + assertTrue(i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")", + ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter)); + } } @@ -189,7 +226,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testSimpleRebalancing() throws Exception { - Ignite ignite = startGrid(0); + IgniteKernal ignite = (IgniteKernal)startGrid(0); generateData(ignite, 0, 0); @@ -202,19 +239,43 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(0, 2); waitForRebalancing(1, 2); + awaitPartitionMapExchange(true, true); + + checkPartitionMapExchangeFinished(); + + checkPartitionMapMessagesAbsent(); + stopGrid(0); waitForRebalancing(1, 3); + awaitPartitionMapExchange(true, true); + + checkPartitionMapExchangeFinished(); + + checkPartitionMapMessagesAbsent(); + startGrid(2); waitForRebalancing(1, 4); waitForRebalancing(2, 4); + awaitPartitionMapExchange(true, true); + + checkPartitionMapExchangeFinished(); + + checkPartitionMapMessagesAbsent(); + stopGrid(2); waitForRebalancing(1, 5); + awaitPartitionMapExchange(true, true); + + checkPartitionMapExchangeFinished(); + + checkPartitionMapMessagesAbsent(); + long spend = (System.currentTimeMillis() - start) / 1000; checkData(grid(1), 0, 0); @@ -277,7 +338,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { concurrentStartFinished = true; - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); checkSupplyContextMapIsEmpty(); @@ -348,12 +409,79 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { Map map = U.field(supplier, "scMap"); synchronized (map) { - assert map.isEmpty(); + assertTrue(map.isEmpty()); } } } } + protected void checkPartitionMapExchangeFinished() { + for (Ignite g : G.allGrids()) { + IgniteKernal g0 = (IgniteKernal)g; + + for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) { + CacheConfiguration cfg = c.context().config(); + + if (cfg.getCacheMode() != LOCAL && cfg.getRebalanceMode() != NONE) { + GridDhtCacheAdapter<?, ?> dht = dht(c); + + GridDhtPartitionTopology top = dht.topology(); + + List<GridDhtLocalPartition> locs = top.localPartitions(); + + for (GridDhtLocalPartition loc : locs) { + assertTrue("Wrong partition state, should be OWNING [state=" + loc.state() + "]", + loc.state() == GridDhtPartitionState.OWNING); + + Collection<ClusterNode> affNodes = + g0.affinity(cfg.getName()).mapPartitionToPrimaryAndBackups(loc.id()); + + assertTrue(affNodes.contains(g0.localNode())); + } + + for (Ignite remote : G.allGrids()) { + IgniteKernal remote0 = (IgniteKernal)remote; + + IgniteCacheProxy<?, ?> remoteC = remote0.context().cache().jcache(cfg.getName()); + + GridDhtCacheAdapter<?, ?> remoteDht = dht(remoteC); + + GridDhtPartitionTopology remoteTop = remoteDht.topology(); + + GridDhtPartitionMap2 pMap = remoteTop.partitionMap(true).get(((IgniteKernal)g).getLocalNodeId()); + + assertEquals(pMap.size(), locs.size()); + + for (Map.Entry entry : pMap.entrySet()) { + assertTrue("Wrong partition state, should be OWNING [state=" + entry.getValue() + "]", + entry.getValue() == GridDhtPartitionState.OWNING); + } + + for (GridDhtLocalPartition loc : locs) { + assertTrue(pMap.containsKey(loc.id())); + } + } + } + } + } + } + + protected void checkPartitionMapMessagesAbsent() throws IgniteInterruptedCheckedException { + map.clear(); + + record = true; + + U.sleep(30_000); + + record = false; + + AtomicInteger iF = map.get(GridDhtPartitionsFullMessage.class); + AtomicInteger iS = map.get(GridDhtPartitionsSingleMessage.class); + + assertTrue(iF == null || iF.get() == 1); // 1 message can be sent right after all checks passed. + assertTrue(iS == null); + } + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return 10 * 60_000; @@ -446,7 +574,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 5, 1); waitForRebalancing(4, 5, 1); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); checkSupplyContextMapIsEmpty(); @@ -470,7 +598,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 6); waitForRebalancing(4, 6); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); checkSupplyContextMapIsEmpty(); @@ -480,7 +608,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 7); waitForRebalancing(4, 7); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); checkSupplyContextMapIsEmpty(); @@ -489,7 +617,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 8); waitForRebalancing(4, 8); - awaitPartitionMapExchange(true); + awaitPartitionMapExchange(true, true); + + checkPartitionMapExchangeFinished(); + + checkPartitionMapMessagesAbsent(); checkSupplyContextMapIsEmpty(); @@ -514,4 +646,40 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopAllGrids(); } + + /** + * + */ + private class CountingCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(final ClusterNode node, final Message msg, + final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + final Object msg0 = ((GridIoMessage)msg).message(); + + recordMessage(msg0); + + super.sendMessage(node, msg, ackC); + } + + /** + * @param msg + */ + private void recordMessage(Object msg) { + if (record) { + Class id = msg.getClass(); + + AtomicInteger ai = map.get(id); + + if (ai == null) { + ai = new AtomicInteger(); + + AtomicInteger oldAi = map.putIfAbsent(id, ai); + + (oldAi != null ? oldAi : ai).incrementAndGet(); + } + else + ai.incrementAndGet(); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 4fcc1ed..e53ec56 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -416,15 +418,18 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { */ @SuppressWarnings("BusyWait") protected void awaitPartitionMapExchange() throws InterruptedException { - awaitPartitionMapExchange(false); + awaitPartitionMapExchange(false, false); } /** * @param waitEvicts If {@code true} will wait for evictions finished. + * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. * @throws InterruptedException If interrupted. */ @SuppressWarnings("BusyWait") - protected void awaitPartitionMapExchange(boolean waitEvicts) throws InterruptedException { + protected void awaitPartitionMapExchange(boolean waitEvicts, boolean waitNode2PartUpdate) throws InterruptedException { + long timeout = 30_000; + for (Ignite g : G.allGrids()) { IgniteKernal g0 = (IgniteKernal)g; @@ -468,7 +473,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { GridDhtLocalPartition loc = top.localPartition(p, readyVer, false); if (affNodes.size() != owners.size() || !affNodes.containsAll(owners) || - (waitEvicts && loc != null && loc.state() == GridDhtPartitionState.RENTING)) { + (waitEvicts && loc != null && loc.state() != GridDhtPartitionState.OWNING)) { LT.warn(log(), null, "Waiting for topology map update [" + "grid=" + g.name() + ", cache=" + cfg.getName() + @@ -501,7 +506,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { if (i == 0) start = System.currentTimeMillis(); - if (System.currentTimeMillis() - start > 30_000) { + if (System.currentTimeMillis() - start > timeout) { U.dumpThreads(log); throw new IgniteException("Timeout of waiting for topology map update [" + @@ -526,6 +531,46 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { break; } } + + if (waitNode2PartUpdate) { + long start = System.currentTimeMillis(); + + boolean failed = true; + + while (failed) { + failed = false; + + for (GridDhtPartitionMap2 pMap : top.partitionMap(true).values()) { + if (failed) + break; + + for (Map.Entry entry : pMap.entrySet()) { + if (System.currentTimeMillis() - start > timeout) { + U.dumpThreads(log); + + throw new IgniteException("Timeout of waiting for partition state update [" + + "grid=" + g.name() + + ", cache=" + cfg.getName() + + ", cacheId=" + dht.context().cacheId() + + ", topVer=" + top.topologyVersion() + + ", locNode=" + g.cluster().localNode() + ']'); + } + + if (entry.getValue() != GridDhtPartitionState.OWNING) { + LT.warn(log(), null, + "Waiting for correct partition state, should be OWNING [state=" + + entry.getValue() + "]"); + + Thread.sleep(200); // Busy wait. + + failed = true; + + break; + } + } + } + } + } } } }
