IGNITE-2801 Coordinator floods network with partitions full map exchange messages
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb56a4aa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb56a4aa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb56a4aa Branch: refs/heads/ignite-2801 Commit: cb56a4aa58f4b610aa603f8c27ea2715803b1847 Parents: fa356e3 Author: Anton Vinogradov <[email protected]> Authored: Mon Mar 21 17:19:36 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Mar 21 17:19:36 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 8 +- ...RabalancingPartitionMapExchangeSelfTest.java | 190 +++++++++++++++++++ .../GridCacheRebalancingSyncSelfTest.java | 7 +- 3 files changed, 196 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1681f2f..caa32d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1264,13 +1264,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana break; } - // If not first preloading and no more topology events present, - // then we periodically refresh partition map. - if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) { - refreshPartitions(timeout); - + if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) timeout = cctx.gridConfig().getNetworkTimeout(); - } + // After workers line up and before preloading starts we initialize all futures. if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java new file mode 100644 index 0000000..8cf11cc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class GridCacheRabalancingPartitionMapExchangeSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>(); + + /** */ + private volatile boolean record = false; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = new CountingCommunicationSpi(); + + commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + commSpi.setTcpNoDelay(true); + + iCfg.setCommunicationSpi(commSpi); + + return iCfg; + } + + /** + * + */ + public class CountingCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(final ClusterNode node, final Message msg, + final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + final Object msg0 = ((GridIoMessage)msg).message(); + + recordMessage(msg0); + + super.sendMessage(node, msg, ackC); + } + } + + /** + * @param msg + */ + private void recordMessage(Object msg) { + if (record) { + String id = msg.getClass().toString(); + + if (msg instanceof GridDhtPartitionsFullMessage) + id += ((GridDhtPartitionsFullMessage)msg).exchangeId(); + + int size = 0; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bos); + out.writeObject(msg); + + size = bos.toByteArray().length; + } + catch (IOException e) { + } + + AtomicInteger ai = map.get("cnt " + id); + + if (ai == null) { + ai = new AtomicInteger(); + + AtomicInteger oldAi = map.putIfAbsent("cnt " + id, ai); + + (oldAi != null ? oldAi : ai).addAndGet(size); + } + else + ai.addAndGet(size); + + ai = map.get("size" + id); + + if (ai == null) { + ai = new AtomicInteger(); + + AtomicInteger oldAi = map.putIfAbsent("size" + id, ai); + + (oldAi != null ? oldAi : ai).incrementAndGet(); + } + else + ai.incrementAndGet(); + } + } + + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * @throws Exception e. + */ + public void test() throws Exception { + record = false; + + startGrids(10); + + awaitPartitionMapExchange(true); + + for (int i = 0; i < 10; i++) { + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(); + + cfg.setName("cache" + i); + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setBackups(1); + + ignite(0).getOrCreateCache(cfg); + } + + awaitPartitionMapExchange(true); + + U.sleep(60_000); + + System.out.println("--------------------------TESTING--------------------------"); + + record = true; + + U.sleep(60_000); + + record = false; + + for (Map.Entry entry : map.entrySet()) { + System.out.println(entry.getKey().toString() + " ------ " + entry.getValue().toString()); + } + + IgniteKernal ignite = ((IgniteKernal)grid(0).cluster().forOldest().ignite()); + + ignite.dumpDebugInfo(); + + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 4ee080f..ceba0e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -172,8 +172,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE + ", iteration=" + iter + ", cache=" + name + "]"); - assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) : - i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")"; + assertTrue(i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")", + ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter)); + } } @@ -340,7 +341,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { Map map = U.field(supplier, "scMap"); synchronized (map) { - assert map.isEmpty(); + assertTrue(map.isEmpty()); } } }
